zstan commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r707478098



##########
File path: 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -123,42 +122,143 @@
     private final Map<String, TableImpl> tables = new ConcurrentHashMap<>();
 
     /** Tables. */
-    private final Map<UUID, TableImpl> tablesById = new ConcurrentHashMap<>();
+    private final Map<IgniteUuid, TableImpl> tablesById = new 
ConcurrentHashMap<>();
+
+    /**
+     * tableId -> future that adds an ability to await distributed table 
creation on local node from within the context
+     * of user's table creation intention.
+     *
+     * In other words, awaiting local {@link 
TableManager#createTableLocally(String, IgniteUuid, List, SchemaDescriptor)}
+     * from within {@link TableManager#createTableAsync(String, Consumer, 
boolean)}
+     */
+    private final Map<IgniteUuid, CompletableFuture<Table>> createTblIntention 
= new ConcurrentHashMap<>();
+
+    /**
+     * tableId -> future that adds an ability to await distributed table alter 
on local node from within the context
+     * of user's table alter intention.
+     *
+     * In other words, awaiting local alter table as a reaction on distributed 
event
+     * from within {@link TableManager#createTableAsync(String, Consumer, 
boolean)}
+     */
+    private final Map<IgniteUuid, CompletableFuture<Void>> alterTblIntention = 
new ConcurrentHashMap<>();
+
+    /**
+     * tableId -> future that adds an ability to await distributed table drop 
on local node from within the context
+     * of user's table drop intention.
+     *
+     * In other words, awaiting local {@link 
TableManager#dropTableLocally(String, IgniteUuid, List)}
+     * from within {@link TableManager#dropTableAsync(String)}
+     */
+    private final Map<IgniteUuid, CompletableFuture<Void>> dropTblIntention = 
new ConcurrentHashMap<>();
 
     /**
      * Creates a new table manager.
      *
-     * @param nodeCfgMgr Node configuration manager.
      * @param clusterCfgMgr Cluster configuration manager.
-     * @param metaStorageMgr Meta storage manager.
-     * @param schemaMgr Schema manager.
-     * @param affMgr Affinity manager.
      * @param raftMgr Raft manager.
+     * @param baselineMgr Baseline manager.
+     * @param metaStorageMgr Meta storage manager.
      * @param partitionsStoreDir Partitions store directory.
      */
     public TableManager(
-        ConfigurationManager nodeCfgMgr,
         ConfigurationManager clusterCfgMgr,
-        MetaStorageManager metaStorageMgr,
-        SchemaManager schemaMgr,
-        AffinityManager affMgr,
         Loza raftMgr,
+        BaselineManager baselineMgr,
+        MetaStorageManager metaStorageMgr,
         Path partitionsStoreDir
     ) {
-        this.nodeCfgMgr = nodeCfgMgr;
         this.clusterCfgMgr = clusterCfgMgr;
-        this.metaStorageMgr = metaStorageMgr;
-        this.affMgr = affMgr;
         this.raftMgr = raftMgr;
-        this.schemaMgr = schemaMgr;
+        this.baselineMgr = baselineMgr;
+        this.metaStorageMgr = metaStorageMgr;
         this.partitionsStoreDir = partitionsStoreDir;
     }
 
     /** {@inheritDoc} */
     @Override public void start() {
-        //TODO: IGNITE-14652 Change a metastorage update in listeners to 
multi-invoke
-        
clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY).tables().listen(ctx
 -> {
-            return onConfigurationChanged(ctx.storageRevision(), 
ctx.oldValue(), ctx.newValue());
+        
clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY).tables().
+            listenElements(new ConfigurationNamedListListener<TableView>() {
+            @Override
+            public @NotNull CompletableFuture<?> onCreate(@NotNull 
ConfigurationNotificationEvent<TableView> ctx) {
+                // Empty assignments might be a valid case if tables are 
created from within cluster init HOCON
+                // configuration, which is not supported now.
+                assert ((ExtendedTableView)ctx.newValue()).assignments() != 
null :
+                    "Table =[" + ctx.newValue().name() + "] has empty 
assignments.";
+
+                final IgniteUuid tblId = 
IgniteUuid.fromString(((ExtendedTableView)ctx.newValue()).id());
+
+                // TODO: IGNITE-15409 Listener with any placeholder should be 
used instead.
+                ((ExtendedTableConfiguration) 
clusterCfgMgr.configurationRegistry().
+                    
getConfiguration(TablesConfiguration.KEY).tables().get(ctx.newValue().name())).schemas().
+                    listenElements(new ConfigurationNamedListListener<>() {
+                        @Override public @NotNull CompletableFuture<?> 
onCreate(
+                            @NotNull 
ConfigurationNotificationEvent<SchemaView> schemasCtx) {
+                            try {
+                                
((SchemaRegistryImpl)tables.get(ctx.newValue().name()).schemaRegistry()).
+                                    
onSchemaRegistered((SchemaDescriptor)ByteUtils.
+                                        
fromBytes(schemasCtx.newValue().schema()));
+
+                                fireEvent(TableEvent.ALTER, new 
TableEventParameters(tablesById.get(tblId)), null);
+
+                                
Optional.ofNullable(alterTblIntention.get(tblId)).ifPresent(f -> 
f.complete(null));
+                            }
+                            catch (Exception e) {
+                                
Optional.ofNullable(alterTblIntention.get(tblId)).ifPresent(f -> 
f.completeExceptionally(e));
+                            }
+
+                            return CompletableFuture.completedFuture(null);
+                        }
+
+                        @Override
+                        public @NotNull CompletableFuture<?> onRename(@NotNull 
String oldName, @NotNull String newName,
+                            @NotNull 
ConfigurationNotificationEvent<SchemaView> ctx) {
+                            return CompletableFuture.completedFuture(null);
+                        }
+
+                        @Override public @NotNull CompletableFuture<?> 
onDelete(
+                            @NotNull 
ConfigurationNotificationEvent<SchemaView> ctx) {
+                            return CompletableFuture.completedFuture(null);
+                        }
+
+                        @Override public @NotNull CompletableFuture<?> 
onUpdate(
+                            @NotNull 
ConfigurationNotificationEvent<SchemaView> ctx) {
+                            return CompletableFuture.completedFuture(null);
+                        }
+                    });
+
+                createTableLocally(
+                    ctx.newValue().name(),
+                    
IgniteUuid.fromString(((ExtendedTableView)ctx.newValue()).id()),
+                    
(List<List<ClusterNode>>)ByteUtils.fromBytes(((ExtendedTableView)ctx.newValue()).assignments()),
+                    
(SchemaDescriptor)ByteUtils.fromBytes(((ExtendedTableView)ctx.newValue()).schemas().
+                        get(String.valueOf(INITIAL_SCHEMA_VERSION)).schema())
+                );
+
+                return CompletableFuture.completedFuture(null);
+            }
+
+            @Override public @NotNull CompletableFuture<?> onRename(@NotNull 
String oldName, @NotNull String newName,
+                @NotNull ConfigurationNotificationEvent<TableView> ctx) {
+                // TODO: IGNITE-15485 Support table rename operation.
+
+                return CompletableFuture.completedFuture(null);
+            }
+
+            @Override

Review comment:
        @Override on a new line




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to