sanpwc commented on a change in pull request #91:
URL: https://github.com/apache/ignite-3/pull/91#discussion_r632550277
##########
File path:
modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
##########
@@ -17,60 +17,310 @@
package org.apache.ignite.internal.schema;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.ignite.configuration.internal.ConfigurationManager;
+import org.apache.ignite.configuration.schemas.table.ColumnTypeView;
+import org.apache.ignite.configuration.schemas.table.ColumnView;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableIndexConfiguration;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
+import org.apache.ignite.configuration.tree.NamedListView;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.schema.registry.SchemaRegistryException;
+import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.metastorage.common.Conditions;
+import org.apache.ignite.metastorage.common.Cursor;
+import org.apache.ignite.metastorage.common.Entry;
+import org.apache.ignite.metastorage.common.Key;
+import org.apache.ignite.metastorage.common.Operations;
+import org.apache.ignite.metastorage.common.WatchEvent;
+import org.apache.ignite.metastorage.common.WatchListener;
+import org.apache.ignite.schema.PrimaryIndex;
+import org.jetbrains.annotations.NotNull;
/**
* Schema Manager.
*/
-// TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided.
-@SuppressWarnings({"FieldCanBeLocal", "unused"}) public class SchemaManager {
- /** Configuration manager in order to handle and listen schema specific
configuration.*/
+public class SchemaManager {
+ /** The logger. */
+ private static final IgniteLogger LOG =
IgniteLogger.forClass(SchemaManager.class);
+
+ /** Internal prefix for the metasorage. */
+ private static final String INTERNAL_PREFIX = "internal.tables.schema.";
+
+ /** Schema history item key suffix. */
+ protected static final String INTERNAL_VER_SUFFIX = ".ver.";
+
+ /** Configuration manager in order to handle and listen schema specific
configuration. */
private final ConfigurationManager configurationMgr;
+ /** Metastorage manager. */
+ private final MetaStorageManager metaStorageMgr;
+
+ /** Vault manager. */
+ private final VaultManager vaultManager;
+
+ /** Schema history subscription future. */
+ private CompletableFuture<Long> schemaHistorySubscriptionFut;
+
/** Schema. */
- private final SchemaDescriptor schema;
+ private final Map<UUID, SchemaRegistryImpl> schemes = new
ConcurrentHashMap<>();
/**
* The constructor.
*
* @param configurationMgr Configuration manager.
+ * @param metaStorageMgr Metastorage manager.
+ * @param vaultManager Vault manager.
*/
- public SchemaManager(ConfigurationManager configurationMgr) {
+ public SchemaManager(
+ ConfigurationManager configurationMgr,
+ MetaStorageManager metaStorageMgr,
+ VaultManager vaultManager
+ ) {
this.configurationMgr = configurationMgr;
+ this.metaStorageMgr = metaStorageMgr;
+ this.vaultManager = vaultManager;
+
+ schemaHistorySubscriptionFut =
metaStorageMgr.registerWatchByPrefix(new Key(INTERNAL_PREFIX), new
WatchListener() {
+ /** {@inheritDoc} */
+ @Override public boolean onUpdate(@NotNull Iterable<WatchEvent>
events) {
+ for (WatchEvent evt : events) {
+ String keyTail =
evt.newEntry().key().toString().substring(INTERNAL_PREFIX.length() - 1);
+
+ int verPos = keyTail.indexOf(INTERNAL_VER_SUFFIX);
+
+ // Last table schema version changed.
+ if (verPos == -1) {
+ UUID tblId = UUID.fromString(keyTail);
- this.schema = new SchemaDescriptor(1,
- new Column[] {
- new Column("key", NativeType.LONG, false)
- },
- new Column[] {
- new Column("value", NativeType.LONG, false)
+ if (evt.oldEntry() == null) // Initial schema added.
+ schemes.put(tblId, new SchemaRegistryImpl());
+ else if (evt.newEntry() == null) // Table Dropped.
+ schemes.remove(tblId);
+ else //TODO:
https://issues.apache.org/jira/browse/IGNITE-13752
+ throw new SchemaRegistryException("Schema upgrade
is not implemented yet.");
+ }
+ else {
+ UUID tblId = UUID.fromString(keyTail.substring(0,
verPos));
+
+ final SchemaRegistryImpl reg = schemes.get(tblId);
+
+ assert reg != null : "Table schema was not initialized
or table has been dropped: " + tblId;
+
+
reg.registerSchema((SchemaDescriptor)ByteUtils.fromBytes(evt.newEntry().value()));
+ }
+ }
+
+ return true;
}
+
+ /** {@inheritDoc} */
+ @Override public void onError(@NotNull Throwable e) {
+ LOG.error("Metastorage listener issue", e);
+ }
+ });
+ }
+
+ /**
+ * Unsubscribes a listener form the affinity calculation.
+ */
+ private void unsubscribeFromAssignmentCalculation() {
+ if (schemaHistorySubscriptionFut == null)
+ return;
+
+ try {
+ Long subscriptionId = schemaHistorySubscriptionFut.get();
+
+ metaStorageMgr.unregisterWatch(subscriptionId);
+
+ schemaHistorySubscriptionFut = null;
+ }
+ catch (InterruptedException | ExecutionException e) {
+ LOG.error("Couldn't unsubscribe for Metastorage updates", e);
+ }
+ }
+
+ /**
+ * Reads current schema configuration, build schema descriptor,
+ * then add it to history rise up table schema version.
+ *
+ * @param tblId Table id.
+ * @param tblName Table name.
+ * @return Operation future.
+ */
+ public CompletableFuture<Boolean> initNewSchemaForTable(UUID tblId, String
tblName) {
+ return vaultManager.get(ByteArray.fromString(INTERNAL_PREFIX + tblId)).
+ thenCompose(entry -> {
+ TableConfiguration tblConfig =
configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY).tables().get(tblName);
+ var key = new Key(INTERNAL_PREFIX + tblId);
+
+ int ver = entry.empty() ? 1 :
(int)ByteUtils.bytesToLong(entry.value(), 0) + 1;
+
+ final SchemaDescriptor desc =
createSchemaDescriptor(tblConfig, ver);
+
+ return metaStorageMgr.invoke(
+ Conditions.key(key).value().eq(entry.value()), // Won't to
rewrite if the version goes ahead.
+ List.of(
+ Operations.put(key, ByteUtils.longToBytes(ver)),
+ Operations.put(new Key(INTERNAL_PREFIX + tblId +
INTERNAL_VER_SUFFIX + ver), ByteUtils.toBytes(desc))
+ ),
+ List.of(
+ Operations.noop(),
Review comment:
One Operations.noop() is enough, it's not necessary to have same amount
of failure operations as success ones.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]