sanpwc commented on a change in pull request #103:
URL: https://github.com/apache/ignite-3/pull/103#discussion_r620256609



##########
File path: 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManagerImpl.java
##########
@@ -17,53 +17,282 @@
 
 package org.apache.ignite.internal.table.distributed;
 
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
 import org.apache.ignite.configuration.internal.ConfigurationManager;
+import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
+import org.apache.ignite.configuration.schemas.runner.LocalConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableChange;
+import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
-import org.apache.ignite.network.ClusterService;
-import org.apache.ignite.internal.schema.SchemaManager;
 import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.TableSchemaView;
+import 
org.apache.ignite.internal.table.distributed.raft.PartitionCommandListener;
+import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.internal.util.ArrayUtils;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.metastorage.common.Conditions;
+import org.apache.ignite.metastorage.common.Key;
+import org.apache.ignite.metastorage.common.Operations;
+import org.apache.ignite.metastorage.common.WatchEvent;
+import org.apache.ignite.metastorage.common.WatchListener;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.apache.ignite.table.Table;
 import org.apache.ignite.table.manager.TableManager;
+import org.jetbrains.annotations.NotNull;
 
-/**
- * Table Manager that handles inner table lifecycle and provide corresponding 
API methods.
- */
-// TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided.
-@SuppressWarnings({"FieldCanBeLocal", "unused"}) public class TableManagerImpl 
implements TableManager {
-    /** Meta storage service. */
-    private final MetaStorageManager metaStorageMgr;
+public class TableManagerImpl implements TableManager {
+    /** The logger. */
+    private static final IgniteLogger LOG = 
IgniteLogger.forClass(TableManagerImpl.class);
 
-    /** Network cluster. */
-    private final ClusterService clusterNetSvc;
+    /** Internal prefix for the metasorage. */
+    private static final String INTERNAL_PREFIX = "internal.tables.";
 
-    /** Schema manager. */
-    private final SchemaManager schemaMgr;
+    /** Meta storage service. */
+    private final MetaStorageManager metaStorageMgr;
 
     /** Configuration manager. */
     private final ConfigurationManager configurationMgr;
 
-    /** Raft manager. */
-    private final Loza raftMgr;
+    /** Table creation subscription future. */
+    private CompletableFuture<Long> tableCreationSubscriptionFut;
+
+    /** Tables. */
+    private Map<String, Table> tables;
 
     /**
-     * The constructor.
-     *
-     * @param configurationMgr Configuration table.
-     * @param clusterNetSvc Cluster network service.
-     * @param metaStorageMgr MetaStorage manager.
-     * @param schemaMgr Schema manager.
+     * @param configurationMgr Configuration manager.
+     * @param metaStorageMgr Meta storage manager.
+     * @param schemaManager Schema manager.
      * @param raftMgr Raft manager.
      */
     public TableManagerImpl(
         ConfigurationManager configurationMgr,
-        ClusterService clusterNetSvc,
         MetaStorageManager metaStorageMgr,
-        SchemaManager schemaMgr,
+        SchemaManager schemaManager,
         Loza raftMgr
     ) {
+        tables = new HashMap<>();
+
         this.configurationMgr = configurationMgr;
-        this.clusterNetSvc = clusterNetSvc;
         this.metaStorageMgr = metaStorageMgr;
-        this.schemaMgr = schemaMgr;
-        this.raftMgr = raftMgr;
+
+        String localNodeName = 
configurationMgr.configurationRegistry().getConfiguration(NetworkConfiguration.KEY)
+            .name().value();
+
+        
configurationMgr.configurationRegistry().getConfiguration(LocalConfiguration.KEY)
+            .metastorageMembers().listen(ctx -> {
+            if (ctx.newValue() != null) {
+                if (hasMetastorageLocally(localNodeName, ctx.newValue()))
+                    subscribeForTableCreation();
+                else
+                    unsubscribeForTableCreation();
+            }
+            return CompletableFuture.completedFuture(null);
+
+        });
+
+        String[] metastorageMembers = 
configurationMgr.configurationRegistry().getConfiguration(LocalConfiguration.KEY)
+            .metastorageMembers().value();
+
+        if (hasMetastorageLocally(localNodeName, metastorageMembers))
+            subscribeForTableCreation();
+
+        String tableInternalPrefix = INTERNAL_PREFIX + "assignment.#";
+
+        tableCreationSubscriptionFut = metaStorageMgr.registerWatch(new 
Key(tableInternalPrefix), new WatchListener() {
+            @Override public boolean onUpdate(@NotNull Iterable<WatchEvent> 
events) {
+                for (WatchEvent evt : events) {
+                    if (!ArrayUtils.empty(evt.newEntry().value())) {
+                        String keyTail = 
evt.newEntry().key().toString().substring(INTERNAL_PREFIX.length());
+
+                        String placeholderValue = keyTail.substring(0, 
keyTail.indexOf('.'));
+
+                        UUID tblId = UUID.fromString(placeholderValue);
+
+                        try {
+                            String name = new String(metaStorageMgr.get(
+                                new Key(INTERNAL_PREFIX + 
tblId.toString())).get()
+                                .value(), StandardCharsets.UTF_8);
+                            int partitions = 
configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY)
+                                .tables().get(name).partitions().value();
+
+                            List<List<ClusterNode>> assignment = 
(List<List<ClusterNode>>)IgniteUtils.fromBytes(
+                                evt.newEntry().value());
+
+                            HashMap<Integer, RaftGroupService> partitionMap = 
new HashMap<>(partitions);
+
+                            for (int p = 0; p < partitions; p++) {
+                                partitionMap.put(p, raftMgr.startRaftGroup(

Review comment:
       Definitely we will need sort of async notifications processing. However 
it's not the part of current design.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to