vldpyatkov commented on a change in pull request #115:
URL: https://github.com/apache/ignite-3/pull/115#discussion_r623815554



##########
File path: 
modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
##########
@@ -17,36 +17,138 @@
 
 package org.apache.ignite.internal.schema;
 
+import java.util.ArrayList;
+import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.ignite.configuration.internal.ConfigurationManager;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.metastorage.common.Conditions;
+import org.apache.ignite.metastorage.common.Cursor;
+import org.apache.ignite.metastorage.common.Entry;
+import org.apache.ignite.metastorage.common.Key;
+import org.apache.ignite.metastorage.common.Operations;
+import org.apache.ignite.metastorage.common.WatchEvent;
+import org.apache.ignite.metastorage.common.WatchListener;
+import org.jetbrains.annotations.NotNull;
 
 /**
  * Schema Manager.
  */
-// TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided.
-@SuppressWarnings({"FieldCanBeLocal", "unused"}) public class SchemaManager {
-    /** Configuration manager in order to handle and listen schema specific 
configuration.*/
+public class SchemaManager {
+    /** The logger. */
+    private static final IgniteLogger LOG = 
IgniteLogger.forClass(SchemaManager.class);
+
+    /** Internal prefix for the metasorage. */
+    private static final String INTERNAL_PREFIX = "internal.tables.schema.";
+
+    /** Configuration manager in order to handle and listen schema specific 
configuration. */
     private final ConfigurationManager configurationMgr;
 
+    /** Metastorage manager. */
+    private final MetaStorageManager metaStorageManager;
+
+    /** Vault manager. */
+    private final VaultManager vaultManager;
+
     /** Schema. */
-    private final SchemaDescriptor schema;
+    private final Map<UUID, SchemaDescriptor> schemes = new 
ConcurrentHashMap<>();
 
     /**
      * The constructor.
      *
      * @param configurationMgr Configuration manager.
+     * @param metaStorageManager Metastorage manager.
+     * @param vaultManager Vault manager.
      */
-    public SchemaManager(ConfigurationManager configurationMgr) {
+    public SchemaManager(
+        ConfigurationManager configurationMgr,
+        MetaStorageManager metaStorageManager,
+        VaultManager vaultManager
+    ) {
         this.configurationMgr = configurationMgr;
+        this.metaStorageManager = metaStorageManager;
+        this.vaultManager = vaultManager;
+
+//        this.schema = new SchemaDescriptor(1,
+//            new Column[] {
+//                new Column("key", NativeType.LONG, false)
+//            },
+//            new Column[] {
+//                new Column("value", NativeType.LONG, false)
+//            }
+//        );
+
+        metaStorageManager.registerWatchByPrefix(new Key(INTERNAL_PREFIX), new 
WatchListener() {
+            @Override public boolean onUpdate(@NotNull Iterable<WatchEvent> 
events) {
+                for (WatchEvent evt : events) {
+                    String keyTail = 
evt.newEntry().key().toString().substring(INTERNAL_PREFIX.length());
+
+                    UUID tblId = UUID.fromString(keyTail.substring(0, 
keyTail.indexOf('.')));
 
-        this.schema = new SchemaDescriptor(1,
-            new Column[] {
-                new Column("key", NativeType.LONG, false)
-            },
-            new Column[] {
-                new Column("value", NativeType.LONG, false)
+                    int schemaIdVal = 
Integer.parseInt(keyTail.substring(keyTail.indexOf('.'), keyTail.length() - 1));
+
+                    if (evt.newEntry().value() == null)
+                        schemes.computeIfPresent(tblId, (key, val) -> 
val.version() == schemaIdVal ? null : val);
+                    else {
+                        schemes.compute(tblId, (key, val) -> val == null || 
schemaIdVal > val.version() ?
+                            
(SchemaDescriptor)ByteUtils.fromBytes(evt.newEntry().value()) : val);
+                    }
+                }
+
+                return true;
+            }
+
+            @Override public void onError(@NotNull Throwable e) {
+                LOG.error("Faled to notyfy Schema manager.", e);
             }
-        );
+        });
+    }
+
+    /**
+     * Registers new schema.
+     *
+     * @param tableId Table identifier.
+     * @param desc Schema descriptor.
+     */
+    public CompletableFuture<Boolean> registerSchema(UUID tableId, 
SchemaDescriptor desc) {
+        int schemaVersion = desc.version();
+
+        return metaStorageManager.invoke(new Key(INTERNAL_PREFIX
+                //Tbale id

Review comment:
       Removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to