ascherbakoff commented on a change in pull request #103:
URL: https://github.com/apache/ignite-3/pull/103#discussion_r619363276



##########
File path: 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManagerImpl.java
##########
@@ -17,53 +17,282 @@
 
 package org.apache.ignite.internal.table.distributed;
 
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
 import org.apache.ignite.configuration.internal.ConfigurationManager;
+import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
+import org.apache.ignite.configuration.schemas.runner.LocalConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableChange;
+import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
-import org.apache.ignite.network.ClusterService;
-import org.apache.ignite.internal.schema.SchemaManager;
 import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.TableSchemaView;
+import 
org.apache.ignite.internal.table.distributed.raft.PartitionCommandListener;
+import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.internal.util.ArrayUtils;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.metastorage.common.Conditions;
+import org.apache.ignite.metastorage.common.Key;
+import org.apache.ignite.metastorage.common.Operations;
+import org.apache.ignite.metastorage.common.WatchEvent;
+import org.apache.ignite.metastorage.common.WatchListener;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.apache.ignite.table.Table;
 import org.apache.ignite.table.manager.TableManager;
+import org.jetbrains.annotations.NotNull;
 
-/**
- * Table Manager that handles inner table lifecycle and provide corresponding 
API methods.
- */
-// TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided.
-@SuppressWarnings({"FieldCanBeLocal", "unused"}) public class TableManagerImpl 
implements TableManager {
-    /** Meta storage service. */
-    private final MetaStorageManager metaStorageMgr;
+public class TableManagerImpl implements TableManager {
+    /** Internal prefix for the metasorage. */
+    public static final String INTERNAL_PREFIX = "internal.tables.";
 
-    /** Network cluster. */
-    private final ClusterService clusterNetSvc;
+    /** Logger. */
+    private static final IgniteLogger LOG = 
IgniteLogger.forClass(TableManagerImpl.class);
 
-    /** Schema manager. */
-    private final SchemaManager schemaMgr;
+    /** Meta storage service. */
+    private final MetaStorageManager metaStorageMgr;
 
     /** Configuration manager. */
     private final ConfigurationManager configurationMgr;
 
-    /** Raft manager. */
-    private final Loza raftMgr;
+    /** Table creation subscription future. */
+    private CompletableFuture<Long> tableCreationSubscriptionFut;
+
+    /** Tables. */
+    private Map<String, Table> tables;
 
     /**
-     * The constructor.
-     *
-     * @param configurationMgr Configuration table.
-     * @param clusterNetSvc Cluster network service.
-     * @param metaStorageMgr MetaStorage manager.
-     * @param schemaMgr Schema manager.
+     * @param configurationMgr Configuration manager.
+     * @param metaStorageMgr Meta storage manager.
+     * @param schemaManager Schema manager.
      * @param raftMgr Raft manager.
      */
     public TableManagerImpl(
         ConfigurationManager configurationMgr,
-        ClusterService clusterNetSvc,
         MetaStorageManager metaStorageMgr,
-        SchemaManager schemaMgr,
+        SchemaManager schemaManager,
         Loza raftMgr
     ) {
+        tables = new HashMap<>();
+
         this.configurationMgr = configurationMgr;
-        this.clusterNetSvc = clusterNetSvc;
         this.metaStorageMgr = metaStorageMgr;
-        this.schemaMgr = schemaMgr;
-        this.raftMgr = raftMgr;
+
+        String localNodeName = 
configurationMgr.configurationRegistry().getConfiguration(NetworkConfiguration.KEY)
+            .name().value();
+
+        
configurationMgr.configurationRegistry().getConfiguration(LocalConfiguration.KEY)
+            .metastorageMembers().listen(ctx -> {
+            if (ctx.newValue() != null) {
+                if (hasMetastorageLocally(localNodeName, ctx.newValue()))
+                    subscribeForTableCreation();
+                else
+                    unsubscribeForTableCreation();
+            }
+            return CompletableFuture.completedFuture(null);
+
+        });
+
+        String[] metastorageMembers = 
configurationMgr.configurationRegistry().getConfiguration(LocalConfiguration.KEY)
+            .metastorageMembers().value();
+
+        if (hasMetastorageLocally(localNodeName, metastorageMembers))
+            subscribeForTableCreation();
+
+        String tableInternalPrefix = INTERNAL_PREFIX + "assignment.#";
+
+        tableCreationSubscriptionFut = metaStorageMgr.registerWatch(new 
Key(tableInternalPrefix), new WatchListener() {
+            @Override public boolean onUpdate(@NotNull Iterable<WatchEvent> 
events) {
+                for (WatchEvent evt : events) {
+                    if (!ArrayUtils.empty(evt.newEntry().value())) {
+                        String keyTail = 
evt.newEntry().key().toString().substring(INTERNAL_PREFIX.length());
+
+                        String placeholderValue = keyTail.substring(0, 
keyTail.indexOf('.'));
+
+                        UUID tblId = UUID.fromString(placeholderValue);
+
+                        try {
+                            String name = new String(metaStorageMgr.get(
+                                new Key(INTERNAL_PREFIX + 
tblId.toString())).get()
+                                .value(), StandardCharsets.UTF_8);
+                            int partitions = 
configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY)
+                                .tables().get(name).partitions().value();
+
+                            List<List<ClusterNode>> assignment = 
(List<List<ClusterNode>>)IgniteUtils.fromBytes(
+                                evt.newEntry().value());
+
+                            HashMap<Integer, RaftGroupService> partitionMap = 
new HashMap<>(partitions);
+
+                            for (int p = 0; p < partitions; p++) {
+                                partitionMap.put(p, raftMgr.startRaftGroup(
+                                    name + "_part_" + p,
+                                    assignment.get(p),
+                                    new PartitionCommandListener()
+                                ));
+                            }
+
+                            tables.put(name, new TableImpl(
+                                new InternalTableImpl(
+                                    tblId,
+                                    partitionMap,
+                                    partitions
+                                ),
+                                new TableSchemaView() {
+                                    @Override public SchemaDescriptor schema() 
{
+                                        return schemaManager.schema(tblId);
+                                    }
+
+                                    @Override public SchemaDescriptor 
schema(int ver) {
+                                        return schemaManager.schema(tblId, 
ver);
+                                    }
+                                }));
+                        }
+                        catch (InterruptedException | ExecutionException e) {
+                            LOG.error("Failed to start table [key={}]",
+                                evt.newEntry().key(), e);
+                        }
+                    }
+                }
+
+                return false;
+            }
+
+            @Override public void onError(@NotNull Throwable e) {
+                LOG.error("Metastorage listener issue", e);
+            }
+        });
+    }
+
+    /**
+     * Tests a node has a involved into Metastorage.
+     *
+     * @param localNodeName Local node uniq name.
+     * @param metastorageMembers Metastorage members names.
+     * @return True if the node has Metastorage, false otherwise.
+     */
+    private boolean hasMetastorageLocally(String localNodeName, String[] 
metastorageMembers) {
+        boolean isLocalNodeHasMetasorage = false;
+
+        for (String name : metastorageMembers) {
+            if (name.equals(localNodeName)) {
+                isLocalNodeHasMetasorage = true;
+
+                break;
+            }
+        }
+        return isLocalNodeHasMetasorage;
+    }
+
+    /**
+     * Subscribes on table create.
+     */
+    private void subscribeForTableCreation() {
+        
configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY)
+            .tables().listen(ctx -> {
+            HashSet<String> tblNamesToStart = new 
HashSet<>(ctx.newValue().namedListKeys());
+
+            long revision = ctx.storageRevision();
+
+            if (ctx.oldValue() != null)
+                tblNamesToStart.removeAll(ctx.oldValue().namedListKeys());
+
+            for (String tblName : tblNamesToStart) {
+                TableView tableView = ctx.newValue().get(tblName);
+                long update = 0;
+
+                UUID tblId = new UUID(revision, update);
+
+                CompletableFuture<Boolean> fut = metaStorageMgr.invoke(

Review comment:
       What is the point in conditional update ?
   The keys seems unique.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to