korlov42 commented on code in PR #1134:
URL: https://github.com/apache/ignite-3/pull/1134#discussion_r987683750
##########
modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java:
##########
@@ -84,50 +87,76 @@ public class SchemaManager extends Producer<SchemaEvent,
SchemaEventParameters>
/** Versioned store for tables by name. */
private final VersionedValue<Map<UUID, SchemaRegistryImpl>> registriesVv;
+ /** Meta storage manager. */
+ private final MetaStorageManager metastorageMgr;
+
/** Constructor. */
- public SchemaManager(Consumer<Function<Long, CompletableFuture<?>>>
registry, TablesConfiguration tablesCfg) {
+ public SchemaManager(
+ Consumer<Function<Long, CompletableFuture<?>>> registry,
+ TablesConfiguration tablesCfg,
+ MetaStorageManager metastorageMgr
+ ) {
this.registriesVv = new VersionedValue<>(registry, HashMap::new);
-
this.tablesCfg = tablesCfg;
+ this.metastorageMgr = metastorageMgr;
}
/** {@inheritDoc} */
@Override
public void start() {
Review Comment:
what about recovery after restart?
##########
modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaUtils.java:
##########
@@ -139,4 +147,103 @@ public static boolean equalSchemas(SchemaDescriptor exp,
SchemaDescriptor actual
return true;
}
+
+ /**
+ * Forms schema history key.
+ *
+ * @param tblId Table id.
+ * @param ver Schema version.
+ * @return {@link ByteArray} representation.
+ */
+ public static ByteArray schemaWithVerHistKey(UUID tblId, int ver) {
+ return ByteArray.fromString(tblId + SCHEMA_STORE_PREDICATE + ver);
+ }
+
+ /**
+ * Forms schema history predicate.
+ *
+ * @param tblId Table id.
+ * @return {@link ByteArray} representation.
+ */
+ public static ByteArray schemaHistPredicate(UUID tblId) {
+ return ByteArray.fromString(tblId + SCHEMA_STORE_PREDICATE);
+ }
+
+ /**
+ * Gets the latest version of the table schema which available in
Metastore.
+ *
+ * @param tblId Table id.
+ * @param metastorageMgr Metastorage manager.
+ * @return The latest schema version.
+ */
+ public static int latestSchemaVersion(MetaStorageManager metastorageMgr,
UUID tblId) {
+ try {
+ Cursor<Entry> cur =
metastorageMgr.prefix(schemaHistPredicate(tblId));
+
+ int lastVer = INITIAL_SCHEMA_VERSION;
+
+ for (Entry ent : cur) {
+ String key = ent.key().toString();
+ int pos = key.indexOf(':');
+ assert pos != -1 : "Unexpected key: " + key;
+
+ key = key.substring(pos + 1);
+ int descVer = Integer.parseInt(key);
+
+ if (descVer > lastVer) {
+ lastVer = descVer;
+ }
+ }
+
+ return lastVer;
+ } catch (NoSuchElementException e) {
+ assert false : "Table must exist. [tableId=" + tblId + ']';
+
+ return INITIAL_SCHEMA_VERSION;
+ } catch (NodeStoppingException e) {
+ throw new IgniteException(e.traceId(), e.code(), e.getMessage(),
e);
+ }
+ }
+
+ /**
+ * Gets the latest serialized schema of the table which available in
Metastore.
+ *
+ * @param tblId Table id.
+ * @param metastorageMgr Metastorage manager.
+ * @return The latest schema version or {@code null} if not found.
+ */
+ public static @Nullable byte[] schemaById(MetaStorageManager
metastorageMgr, UUID tblId, int ver) {
Review Comment:
```suggestion
public static byte @Nullable [] schemaById(MetaStorageManager
metastorageMgr, UUID tblId, int ver) {
```
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1226,50 +1205,20 @@ private CompletableFuture<Void>
alterTableAsyncInternal(String name, Consumer<Ta
if (tbl == null) {
tblFut.completeExceptionally(new
TableNotFoundException(DEFAULT_SCHEMA_NAME, name));
} else {
- TableImpl tblImpl = (TableImpl) tbl;
-
tablesCfg.tables().change(ch -> {
if (ch.get(name) == null) {
throw new TableNotFoundException(DEFAULT_SCHEMA_NAME,
name);
}
ch.update(name, tblCh -> {
- tableChange.accept(tblCh);
-
- ((ExtendedTableChange)
tblCh).changeSchemas(schemasCh ->
-
schemasCh.createOrUpdate(String.valueOf(schemasCh.size() + 1), schemaCh -> {
- ExtendedTableView currTableView =
(ExtendedTableView) tablesCfg.tables().get(name).value();
-
- SchemaDescriptor descriptor;
-
- //TODO IGNITE-15747 Remove
try-catch and force configuration validation
- // here to ensure a valid
configuration passed to prepareSchemaDescriptor() method.
- try {
- descriptor =
SchemaUtils.prepareSchemaDescriptor(
- ((ExtendedTableView)
tblCh).schemas().size(),
- tblCh);
-
-
descriptor.columnMapping(SchemaUtils.columnMapper(
-
tblImpl.schemaView().schema(currTableView.schemas().size()),
-
currTableView.columns(),
- descriptor,
- tblCh.columns()));
- } catch (IllegalArgumentException
ex) {
- // Convert unexpected
exceptions here,
- // because validation actually
happens later,
- // when bulk configuration
update is applied.
-
ConfigurationValidationException e =
- new
ConfigurationValidationException(ex.getMessage());
-
- e.addSuppressed(ex);
-
- throw e;
- }
-
-
schemaCh.changeSchema(SchemaSerializerImpl.INSTANCE.serialize(descriptor));
- }));
- }
- );
+ tableChange.accept(tblCh);
+
+ ExtendedTableChange exTblChange =
(ExtendedTableChange) tblCh;
+
+ int lastSchemaVer =
latestSchemaVersion(metaStorageMgr, exTblChange.id());
+
+ exTblChange.changeSchemaId(lastSchemaVer + 1);
Review Comment:
```suggestion
exTblChange.changeSchemaId(exTblChange.schemaId() +
1);
```
Can we do this?
##########
modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java:
##########
@@ -84,50 +90,83 @@ public class SchemaManager extends Producer<SchemaEvent,
SchemaEventParameters>
/** Versioned store for tables by name. */
private final VersionedValue<Map<UUID, SchemaRegistryImpl>> registriesVv;
+ /** Meta storage manager. */
+ private final MetaStorageManager metastorageMgr;
+
/** Constructor. */
- public SchemaManager(Consumer<Function<Long, CompletableFuture<?>>>
registry, TablesConfiguration tablesCfg) {
+ public SchemaManager(
+ Consumer<Function<Long, CompletableFuture<?>>> registry,
+ TablesConfiguration tablesCfg,
+ MetaStorageManager metastorageMgr
+ ) {
this.registriesVv = new VersionedValue<>(registry, HashMap::new);
-
this.tablesCfg = tablesCfg;
+ this.metastorageMgr = metastorageMgr;
}
/** {@inheritDoc} */
@Override
public void start() {
- ((ExtendedTableConfiguration)
tablesCfg.tables().any()).schemas().listenElements(new
ConfigurationNamedListListener<>() {
- @Override
- public CompletableFuture<?>
onCreate(ConfigurationNotificationEvent<SchemaView> schemasCtx) {
- return onSchemaCreate(schemasCtx);
- }
- });
+ tablesCfg.tables().any().columns().listen(this::onSchemaChange);
}
/**
* Listener of schema configuration changes.
*
- * @param schemasCtx Schemas configuration context.
+ * @param ctx Configuration context.
* @return A future.
*/
- private CompletableFuture<?>
onSchemaCreate(ConfigurationNotificationEvent<SchemaView> schemasCtx) {
+ private CompletableFuture<?>
onSchemaChange(ConfigurationNotificationEvent<NamedListView<ColumnView>> ctx) {
if (!busyLock.enterBusy()) {
return failedFuture(new IgniteInternalException(NODE_STOPPING_ERR,
new NodeStoppingException()));
}
try {
- long causalityToken = schemasCtx.storageRevision();
+ ExtendedTableView tblCfg = (ExtendedTableView)
ctx.config(ExtendedTableConfiguration.class).value();
+
+ int verFromUpdate = tblCfg.schemaId();
+
+ UUID tblId = tblCfg.id();
+
+ String tableName = tblCfg.name();
+
+ SchemaDescriptor schemaDescFromUpdate =
SchemaUtils.prepareSchemaDescriptor(verFromUpdate, tblCfg);
- ExtendedTableConfiguration tblCfg =
schemasCtx.config(ExtendedTableConfiguration.class);
+ byte[] curSchemaDesc = schemaById(metastorageMgr, tblId,
verFromUpdate);
- UUID tblId = tblCfg.id().value();
+ if (verFromUpdate != INITIAL_SCHEMA_VERSION) {
+ byte[] oldSchemaSerialized = schemaById(metastorageMgr, tblId,
verFromUpdate - 1);
+ assert oldSchemaSerialized != null;
+ SchemaDescriptor oldSchema =
SchemaSerializerImpl.INSTANCE.deserialize(oldSchemaSerialized);
- String tableName = tblCfg.name().value();
+ NamedListView<ColumnView> oldCols = ctx.oldValue();
+ NamedListView<ColumnView> newCols = ctx.newValue();
- SchemaDescriptor schemaDescriptor =
SchemaSerializerImpl.INSTANCE.deserialize((schemasCtx.newValue().schema()));
+ schemaDescFromUpdate.columnMapping(SchemaUtils.columnMapper(
+ oldSchema,
+ oldCols,
+ schemaDescFromUpdate,
+ newCols));
+ }
+
+ long causalityToken = ctx.storageRevision();
- CompletableFuture<?> createSchemaFut =
createSchema(causalityToken, tblId, tableName, schemaDescriptor);
+ CompletableFuture<?> createSchemaFut =
createSchema(causalityToken, tblId, tableName, schemaDescFromUpdate);
registriesVv.get(causalityToken).thenRun(() -> inBusyLock(busyLock,
- () -> fireEvent(SchemaEvent.CREATE, new
SchemaEventParameters(causalityToken, tblId, schemaDescriptor))));
+ () -> fireEvent(SchemaEvent.CREATE, new
SchemaEventParameters(causalityToken, tblId, schemaDescFromUpdate))));
+
+ if (curSchemaDesc == null) {
+ try {
+ byte[] serialized =
SchemaSerializerImpl.INSTANCE.serialize(schemaDescFromUpdate);
+
+ createSchemaFut.thenCompose(t -> metastorageMgr.put(
+ schemaWithVerHistKey(tblId, verFromUpdate),
serialized)
+ ).thenApply(t -> t);
Review Comment:
`.thenApply(t -> t)` doesn't make sense
##########
modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java:
##########
@@ -84,50 +90,83 @@ public class SchemaManager extends Producer<SchemaEvent,
SchemaEventParameters>
/** Versioned store for tables by name. */
private final VersionedValue<Map<UUID, SchemaRegistryImpl>> registriesVv;
+ /** Meta storage manager. */
+ private final MetaStorageManager metastorageMgr;
+
/** Constructor. */
- public SchemaManager(Consumer<Function<Long, CompletableFuture<?>>>
registry, TablesConfiguration tablesCfg) {
+ public SchemaManager(
+ Consumer<Function<Long, CompletableFuture<?>>> registry,
+ TablesConfiguration tablesCfg,
+ MetaStorageManager metastorageMgr
+ ) {
this.registriesVv = new VersionedValue<>(registry, HashMap::new);
-
this.tablesCfg = tablesCfg;
+ this.metastorageMgr = metastorageMgr;
}
/** {@inheritDoc} */
@Override
public void start() {
- ((ExtendedTableConfiguration)
tablesCfg.tables().any()).schemas().listenElements(new
ConfigurationNamedListListener<>() {
- @Override
- public CompletableFuture<?>
onCreate(ConfigurationNotificationEvent<SchemaView> schemasCtx) {
- return onSchemaCreate(schemasCtx);
- }
- });
+ tablesCfg.tables().any().columns().listen(this::onSchemaChange);
}
/**
* Listener of schema configuration changes.
*
- * @param schemasCtx Schemas configuration context.
+ * @param ctx Configuration context.
* @return A future.
*/
- private CompletableFuture<?>
onSchemaCreate(ConfigurationNotificationEvent<SchemaView> schemasCtx) {
+ private CompletableFuture<?>
onSchemaChange(ConfigurationNotificationEvent<NamedListView<ColumnView>> ctx) {
if (!busyLock.enterBusy()) {
return failedFuture(new IgniteInternalException(NODE_STOPPING_ERR,
new NodeStoppingException()));
}
try {
- long causalityToken = schemasCtx.storageRevision();
+ ExtendedTableView tblCfg = (ExtendedTableView)
ctx.config(ExtendedTableConfiguration.class).value();
+
+ int verFromUpdate = tblCfg.schemaId();
+
+ UUID tblId = tblCfg.id();
+
+ String tableName = tblCfg.name();
+
+ SchemaDescriptor schemaDescFromUpdate =
SchemaUtils.prepareSchemaDescriptor(verFromUpdate, tblCfg);
- ExtendedTableConfiguration tblCfg =
schemasCtx.config(ExtendedTableConfiguration.class);
+ byte[] curSchemaDesc = schemaById(metastorageMgr, tblId,
verFromUpdate);
- UUID tblId = tblCfg.id().value();
+ if (verFromUpdate != INITIAL_SCHEMA_VERSION) {
+ byte[] oldSchemaSerialized = schemaById(metastorageMgr, tblId,
verFromUpdate - 1);
Review Comment:
Can we get the previous schema from registry?
##########
modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaUtils.java:
##########
@@ -139,4 +147,103 @@ public static boolean equalSchemas(SchemaDescriptor exp,
SchemaDescriptor actual
return true;
}
+
+ /**
+ * Forms schema history key.
+ *
+ * @param tblId Table id.
+ * @param ver Schema version.
+ * @return {@link ByteArray} representation.
+ */
+ public static ByteArray schemaWithVerHistKey(UUID tblId, int ver) {
Review Comment:
I believe these new methods should be internals of SchemaManager, and we
must not expose such information
##########
modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java:
##########
@@ -84,50 +90,83 @@ public class SchemaManager extends Producer<SchemaEvent,
SchemaEventParameters>
/** Versioned store for tables by name. */
private final VersionedValue<Map<UUID, SchemaRegistryImpl>> registriesVv;
+ /** Meta storage manager. */
+ private final MetaStorageManager metastorageMgr;
+
/** Constructor. */
- public SchemaManager(Consumer<Function<Long, CompletableFuture<?>>>
registry, TablesConfiguration tablesCfg) {
+ public SchemaManager(
+ Consumer<Function<Long, CompletableFuture<?>>> registry,
+ TablesConfiguration tablesCfg,
+ MetaStorageManager metastorageMgr
+ ) {
this.registriesVv = new VersionedValue<>(registry, HashMap::new);
-
this.tablesCfg = tablesCfg;
+ this.metastorageMgr = metastorageMgr;
}
/** {@inheritDoc} */
@Override
public void start() {
- ((ExtendedTableConfiguration)
tablesCfg.tables().any()).schemas().listenElements(new
ConfigurationNamedListListener<>() {
- @Override
- public CompletableFuture<?>
onCreate(ConfigurationNotificationEvent<SchemaView> schemasCtx) {
- return onSchemaCreate(schemasCtx);
- }
- });
+ tablesCfg.tables().any().columns().listen(this::onSchemaChange);
}
/**
* Listener of schema configuration changes.
*
- * @param schemasCtx Schemas configuration context.
+ * @param ctx Configuration context.
* @return A future.
*/
- private CompletableFuture<?>
onSchemaCreate(ConfigurationNotificationEvent<SchemaView> schemasCtx) {
+ private CompletableFuture<?>
onSchemaChange(ConfigurationNotificationEvent<NamedListView<ColumnView>> ctx) {
if (!busyLock.enterBusy()) {
return failedFuture(new IgniteInternalException(NODE_STOPPING_ERR,
new NodeStoppingException()));
}
try {
- long causalityToken = schemasCtx.storageRevision();
+ ExtendedTableView tblCfg = (ExtendedTableView)
ctx.config(ExtendedTableConfiguration.class).value();
+
+ int verFromUpdate = tblCfg.schemaId();
+
+ UUID tblId = tblCfg.id();
+
+ String tableName = tblCfg.name();
+
+ SchemaDescriptor schemaDescFromUpdate =
SchemaUtils.prepareSchemaDescriptor(verFromUpdate, tblCfg);
- ExtendedTableConfiguration tblCfg =
schemasCtx.config(ExtendedTableConfiguration.class);
+ byte[] curSchemaDesc = schemaById(metastorageMgr, tblId,
verFromUpdate);
Review Comment:
seems like we can avoid this read
##########
modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java:
##########
@@ -84,50 +90,83 @@ public class SchemaManager extends Producer<SchemaEvent,
SchemaEventParameters>
/** Versioned store for tables by name. */
private final VersionedValue<Map<UUID, SchemaRegistryImpl>> registriesVv;
+ /** Meta storage manager. */
+ private final MetaStorageManager metastorageMgr;
+
/** Constructor. */
- public SchemaManager(Consumer<Function<Long, CompletableFuture<?>>>
registry, TablesConfiguration tablesCfg) {
+ public SchemaManager(
+ Consumer<Function<Long, CompletableFuture<?>>> registry,
+ TablesConfiguration tablesCfg,
+ MetaStorageManager metastorageMgr
+ ) {
this.registriesVv = new VersionedValue<>(registry, HashMap::new);
-
this.tablesCfg = tablesCfg;
+ this.metastorageMgr = metastorageMgr;
}
/** {@inheritDoc} */
@Override
public void start() {
- ((ExtendedTableConfiguration)
tablesCfg.tables().any()).schemas().listenElements(new
ConfigurationNamedListListener<>() {
- @Override
- public CompletableFuture<?>
onCreate(ConfigurationNotificationEvent<SchemaView> schemasCtx) {
- return onSchemaCreate(schemasCtx);
- }
- });
+ tablesCfg.tables().any().columns().listen(this::onSchemaChange);
}
/**
* Listener of schema configuration changes.
*
- * @param schemasCtx Schemas configuration context.
+ * @param ctx Configuration context.
* @return A future.
*/
- private CompletableFuture<?>
onSchemaCreate(ConfigurationNotificationEvent<SchemaView> schemasCtx) {
+ private CompletableFuture<?>
onSchemaChange(ConfigurationNotificationEvent<NamedListView<ColumnView>> ctx) {
if (!busyLock.enterBusy()) {
return failedFuture(new IgniteInternalException(NODE_STOPPING_ERR,
new NodeStoppingException()));
}
try {
- long causalityToken = schemasCtx.storageRevision();
+ ExtendedTableView tblCfg = (ExtendedTableView)
ctx.config(ExtendedTableConfiguration.class).value();
+
+ int verFromUpdate = tblCfg.schemaId();
+
+ UUID tblId = tblCfg.id();
+
+ String tableName = tblCfg.name();
+
+ SchemaDescriptor schemaDescFromUpdate =
SchemaUtils.prepareSchemaDescriptor(verFromUpdate, tblCfg);
- ExtendedTableConfiguration tblCfg =
schemasCtx.config(ExtendedTableConfiguration.class);
+ byte[] curSchemaDesc = schemaById(metastorageMgr, tblId,
verFromUpdate);
- UUID tblId = tblCfg.id().value();
+ if (verFromUpdate != INITIAL_SCHEMA_VERSION) {
+ byte[] oldSchemaSerialized = schemaById(metastorageMgr, tblId,
verFromUpdate - 1);
+ assert oldSchemaSerialized != null;
+ SchemaDescriptor oldSchema =
SchemaSerializerImpl.INSTANCE.deserialize(oldSchemaSerialized);
- String tableName = tblCfg.name().value();
+ NamedListView<ColumnView> oldCols = ctx.oldValue();
+ NamedListView<ColumnView> newCols = ctx.newValue();
- SchemaDescriptor schemaDescriptor =
SchemaSerializerImpl.INSTANCE.deserialize((schemasCtx.newValue().schema()));
+ schemaDescFromUpdate.columnMapping(SchemaUtils.columnMapper(
+ oldSchema,
+ oldCols,
+ schemaDescFromUpdate,
+ newCols));
+ }
+
+ long causalityToken = ctx.storageRevision();
- CompletableFuture<?> createSchemaFut =
createSchema(causalityToken, tblId, tableName, schemaDescriptor);
+ CompletableFuture<?> createSchemaFut =
createSchema(causalityToken, tblId, tableName, schemaDescFromUpdate);
registriesVv.get(causalityToken).thenRun(() -> inBusyLock(busyLock,
- () -> fireEvent(SchemaEvent.CREATE, new
SchemaEventParameters(causalityToken, tblId, schemaDescriptor))));
+ () -> fireEvent(SchemaEvent.CREATE, new
SchemaEventParameters(causalityToken, tblId, schemaDescFromUpdate))));
+
+ if (curSchemaDesc == null) {
+ try {
+ byte[] serialized =
SchemaSerializerImpl.INSTANCE.serialize(schemaDescFromUpdate);
+
+ createSchemaFut.thenCompose(t -> metastorageMgr.put(
+ schemaWithVerHistKey(tblId, verFromUpdate),
serialized)
+ ).thenApply(t -> t);
Review Comment:
Actually, this whole try - catch block is problematic. Why do you write the
schema after the operation is considered completed?
##########
modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java:
##########
@@ -281,59 +336,34 @@ private boolean checkSchemaVersion(UUID tblId, int
schemaVer) {
}
/**
- * Checks that the schema is configured in the Metasorage consensus.
+ * Try to found schema in cache.
Review Comment:
```suggestion
* Try to find schema in cache.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]