vldpyatkov commented on a change in pull request #399:
URL: https://github.com/apache/ignite-3/pull/399#discussion_r733615048



##########
File path: 
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -199,144 +195,199 @@ public TableManager(
             listenElements(new ConfigurationNamedListListener<TableView>() {
             @Override
             public @NotNull CompletableFuture<?> onCreate(@NotNull 
ConfigurationNotificationEvent<TableView> ctx) {
-                // Empty assignments might be a valid case if tables are 
created from within cluster init HOCON
-                // configuration, which is not supported now.
-                assert ((ExtendedTableView)ctx.newValue()).assignments() != 
null :
-                    "Table =[" + ctx.newValue().name() + "] has empty 
assignments.";
-
-                final IgniteUuid tblId = 
IgniteUuid.fromString(((ExtendedTableView)ctx.newValue()).id());
-
-                // TODO: IGNITE-15409 Listener with any placeholder should be 
used instead.
-                
((ExtendedTableConfiguration)tablesCfg.tables().get(ctx.newValue().name())).schemas().
-                    listenElements(new ConfigurationNamedListListener<>() {
-                        @Override public @NotNull CompletableFuture<?> 
onCreate(
-                            @NotNull 
ConfigurationNotificationEvent<SchemaView> schemasCtx) {
-                            try {
-                                
((SchemaRegistryImpl)tables.get(ctx.newValue().name()).schemaView()).
-                                    onSchemaRegistered(
-                                        
SchemaSerializerImpl.INSTANCE.deserialize((schemasCtx.newValue().schema()))
-                                    );
+                if (!busyLock.enterBusy()) {
+                    String tblName = ctx.newValue().name();
+                    IgniteUuid tblId = 
IgniteUuid.fromString(((ExtendedTableView)ctx.newValue()).id());
+
+                    fireEvent(TableEvent.CREATE,
+                        new TableEventParameters(tblId, tblName),
+                        new NodeStoppingException("Operation has been 
cancelled (node is stopping)."));
+                }
+                try {
+                    onTableCreateInternal(ctx);
+                }
+                finally {
+                    busyLock.leaveBusy();
+                }
 
-                                fireEvent(TableEvent.ALTER, new 
TableEventParameters(tablesById.get(tblId)), null);
+                return CompletableFuture.completedFuture(null);
+            }
+
+                /**
+                 * Method for handle a table configuration event.
+                 *
+                 * @param ctx Configuration event.
+                 */
+                private void onTableCreateInternal(@NotNull 
ConfigurationNotificationEvent<TableView> ctx) {
+                    String tblName = ctx.newValue().name();
+                    IgniteUuid tblId = 
IgniteUuid.fromString(((ExtendedTableView)ctx.newValue()).id());
+
+                    // Empty assignments might be a valid case if tables are 
created from within cluster init HOCON
+                    // configuration, which is not supported now.
+                    assert ((ExtendedTableView)ctx.newValue()).assignments() 
!= null :
+                        LoggerMessageHelper.format("Table [id={}, name={}] has 
empty assignments.", tblId, tblName);
+
+                    // TODO: IGNITE-15409 Listener with any placeholder should 
be used instead.
+                    
((ExtendedTableConfiguration)tablesCfg.tables().get(tblName)).schemas().
+                        listenElements(new ConfigurationNamedListListener<>() {
+                            @Override public @NotNull CompletableFuture<?> 
onCreate(
+                                @NotNull 
ConfigurationNotificationEvent<SchemaView> schemasCtx) {
+                                if (!busyLock.enterBusy()) {
+                                    fireEvent(TableEvent.ALTER, new 
TableEventParameters(tblId, tblName),
+                                        new NodeStoppingException("Operation 
has been cancelled (node is stopping)."));
+                                }
+                                try {
+                                    
((SchemaRegistryImpl)tables.get(tblName).schemaView()).
+                                        onSchemaRegistered(
+                                            
SchemaSerializerImpl.INSTANCE.deserialize((schemasCtx.newValue().schema()))
+                                        );
+
+                                    fireEvent(TableEvent.ALTER, new 
TableEventParameters(tablesById.get(tblId)), null);
+                                }
+                                catch (Exception e) {
+                                    fireEvent(TableEvent.ALTER, new 
TableEventParameters(tblId, tblName), e);
+                                }
+                                finally {
+                                    busyLock.leaveBusy();
+                                }
+
+                                return CompletableFuture.completedFuture(null);
                             }
-                            catch (Exception e) {
-                                fireEvent(TableEvent.ALTER, new 
TableEventParameters(tblId, ctx.newValue().name()), e);
+
+                            @Override public @NotNull CompletableFuture<?> 
onRename(@NotNull String oldName,
+                                @NotNull String newName,
+                                @NotNull 
ConfigurationNotificationEvent<SchemaView> ctx) {
+                                return CompletableFuture.completedFuture(null);
                             }
 
-                            return CompletableFuture.completedFuture(null);
-                        }
+                            @Override public @NotNull CompletableFuture<?> 
onDelete(
+                                @NotNull 
ConfigurationNotificationEvent<SchemaView> ctx) {
+                                return CompletableFuture.completedFuture(null);
+                            }
 
-                        @Override
-                        public @NotNull CompletableFuture<?> onRename(@NotNull 
String oldName, @NotNull String newName,
-                            @NotNull 
ConfigurationNotificationEvent<SchemaView> ctx) {
-                            return CompletableFuture.completedFuture(null);
-                        }
+                            @Override public @NotNull CompletableFuture<?> 
onUpdate(
+                                @NotNull 
ConfigurationNotificationEvent<SchemaView> ctx) {
+                                return CompletableFuture.completedFuture(null);
+                            }
+                        });
 
-                        @Override public @NotNull CompletableFuture<?> 
onDelete(
-                            @NotNull 
ConfigurationNotificationEvent<SchemaView> ctx) {
-                            return CompletableFuture.completedFuture(null);
-                        }
+                    
((ExtendedTableConfiguration)tablesCfg.tables().get(tblName)).assignments().
+                        listen(assignmentsCtx -> {
+                            List<List<ClusterNode>> oldAssignments =
+                                
(List<List<ClusterNode>>)ByteUtils.fromBytes(assignmentsCtx.oldValue());
 
-                        @Override public @NotNull CompletableFuture<?> 
onUpdate(
-                            @NotNull 
ConfigurationNotificationEvent<SchemaView> ctx) {
-                            return CompletableFuture.completedFuture(null);
-                        }
-                    });
+                            List<List<ClusterNode>> newAssignments =
+                                
(List<List<ClusterNode>>)ByteUtils.fromBytes(assignmentsCtx.newValue());
+
+                            CompletableFuture<?>[] futures = new 
CompletableFuture<?>[oldAssignments.size()];
+
+                            // TODO: IGNITE-15554 Add logic for assignment 
recalculation in case of partitions or replicas changes
+                            // TODO: Until IGNITE-15554 is implemented it's 
safe to iterate over partitions and replicas cause there will
+                            // TODO: be exact same amount of partitions and 
replicas for both old and new assignments
+                            for (int i = 0; i < oldAssignments.size(); i++) {
+                                int partId = i;
+
+                                List<ClusterNode> oldPartitionAssignment = 
oldAssignments.get(partId);
+                                List<ClusterNode> newPartitionAssignment = 
newAssignments.get(partId);
 
-                
((ExtendedTableConfiguration)tablesCfg.tables().get(ctx.newValue().name())).assignments().
-                    listen(assignmentsCtx -> {
-                        List<List<ClusterNode>> oldAssignments =
-                            
(List<List<ClusterNode>>)ByteUtils.fromBytes(assignmentsCtx.oldValue());
-
-                        List<List<ClusterNode>> newAssignments =
-                            
(List<List<ClusterNode>>)ByteUtils.fromBytes(assignmentsCtx.newValue());
-
-                        CompletableFuture<?>[] futures = new 
CompletableFuture<?>[oldAssignments.size()];
-
-                        // TODO: IGNITE-15554 Add logic for assignment 
recalculation in case of partitions or replicas changes
-                        // TODO: Until IGNITE-15554 is implemented it's safe 
to iterate over partitions and replicas cause there will
-                        // TODO: be exact same amount of partitions and 
replicas for both old and new assignments
-                        for (int i = 0; i < oldAssignments.size(); i++) {
-                            final int p = i;
-
-                            List<ClusterNode> oldPartitionAssignment = 
oldAssignments.get(p);
-                            List<ClusterNode> newPartitionAssignment = 
newAssignments.get(p);
-
-                            var toAdd = new HashSet<>(newPartitionAssignment);
-                            var toRemove = new 
HashSet<>(oldPartitionAssignment);
-
-                            toAdd.removeAll(oldPartitionAssignment);
-                            toRemove.removeAll(newPartitionAssignment);
-
-                            // Create new raft nodes according to new 
assignments.
-                            futures[i] = raftMgr.updateRaftGroup(
-                                raftGroupName(tblId, p),
-                                newPartitionAssignment,
-                                toAdd,
-                                () -> new 
PartitionListener(tableStorages.get(tblId).getOrCreatePartition(p))
-                            )
-                                .thenAccept(
-                                    updatedRaftGroupService -> 
tables.get(ctx.newValue().name()).updateInternalTableRaftGroupService(p, 
updatedRaftGroupService)
-                                ).thenRun(() -> 
raftMgr.stopRaftGroup(raftGroupName(tblId, p), new ArrayList<>(toRemove))
+                                var toAdd = new 
HashSet<>(newPartitionAssignment);
+                                var toRemove = new 
HashSet<>(oldPartitionAssignment);
+
+                                toAdd.removeAll(oldPartitionAssignment);
+                                toRemove.removeAll(newPartitionAssignment);
+
+                                InternalTable internalTable = 
tablesById.get(tblId).internalTable();
+
+                                // Create new raft nodes according to new 
assignments.
+                                futures[i] = raftMgr.prepareRaftGroup(
+                                    raftGroupName(tblId, partId),
+                                    newPartitionAssignment,
+                                    () -> new 
PartitionListener(internalTable.storage().getOrCreatePartition(partId))
+                                ).thenAccept(
+                                    updatedRaftGroupService -> 
internalTable.updateInternalTableRaftGroupService(partId, 
updatedRaftGroupService)
                                 ).exceptionally(th -> {
                                         LOG.error("Failed to update raft 
groups one the node", th);
+
                                         return null;
                                     }
                                 );
-                        }
+                            }
 
-                        return CompletableFuture.allOf(futures);
-                    });
+                            return CompletableFuture.allOf(futures);
+                        });
 
-                createTableLocally(
-                    ctx.newValue().name(),
-                    
IgniteUuid.fromString(((ExtendedTableView)ctx.newValue()).id()),
-                    
(List<List<ClusterNode>>)ByteUtils.fromBytes(((ExtendedTableView)ctx.newValue()).assignments()),
-                    
SchemaSerializerImpl.INSTANCE.deserialize(((ExtendedTableView)ctx.newValue()).schemas().
-                        get(String.valueOf(INITIAL_SCHEMA_VERSION)).schema())
-                );
+                    createTableLocally(
+                        tblName,
+                        tblId,
+                        
(List<List<ClusterNode>>)ByteUtils.fromBytes(((ExtendedTableView)ctx.newValue()).assignments()),
+                        
SchemaSerializerImpl.INSTANCE.deserialize(((ExtendedTableView)ctx.newValue()).schemas().
+                            
get(String.valueOf(INITIAL_SCHEMA_VERSION)).schema())
+                    );
+                }
 
-                return CompletableFuture.completedFuture(null);
-            }
+                @Override
+                public @NotNull CompletableFuture<?> onRename(@NotNull String 
oldName, @NotNull String newName,
+                    @NotNull ConfigurationNotificationEvent<TableView> ctx) {
+                    // TODO: IGNITE-15485 Support table rename operation.
 
-            @Override public @NotNull CompletableFuture<?> onRename(@NotNull 
String oldName, @NotNull String newName,
-                @NotNull ConfigurationNotificationEvent<TableView> ctx) {
-                // TODO: IGNITE-15485 Support table rename operation.
+                    return CompletableFuture.completedFuture(null);
+                }
 
-                return CompletableFuture.completedFuture(null);
-            }
+                @Override public @NotNull CompletableFuture<?> onDelete(
+                    @NotNull ConfigurationNotificationEvent<TableView> ctx
+                ) {
+                    if (!busyLock.enterBusy()) {
+                        String tblName = ctx.oldValue().name();
+                        IgniteUuid tblId = 
IgniteUuid.fromString(((ExtendedTableView)ctx.oldValue()).id());
 
-            @Override public @NotNull CompletableFuture<?> onDelete(
-                @NotNull ConfigurationNotificationEvent<TableView> ctx
-            ) {
-                dropTableLocally(
-                    ctx.oldValue().name(),
-                    
IgniteUuid.fromString(((ExtendedTableView)ctx.oldValue()).id()),
-                    
(List<List<ClusterNode>>)ByteUtils.fromBytes(((ExtendedTableView)ctx.oldValue()).assignments())
-                );
+                        fireEvent(TableEvent.DROP, new 
TableEventParameters(tblId, tblName),
+                            new NodeStoppingException("Operation has been 
cancelled (node is stopping)."));
+                    }
+                    try {
+                        dropTableLocally(
+                            ctx.oldValue().name(),
+                            
IgniteUuid.fromString(((ExtendedTableView)ctx.oldValue()).id()),
+                            
(List<List<ClusterNode>>)ByteUtils.fromBytes(((ExtendedTableView)ctx.oldValue()).assignments())
+                        );
+                    }
+                    finally {
+                        busyLock.leaveBusy();
+                    }
 
-                return CompletableFuture.completedFuture(null);
-            }
+                    return CompletableFuture.completedFuture(null);
+                }
 
-            @Override
-            public @NotNull CompletableFuture<?> onUpdate(@NotNull 
ConfigurationNotificationEvent<TableView> ctx) {
-                return CompletableFuture.completedFuture(null);
-            }
-        });
+                @Override
+                public @NotNull CompletableFuture<?> onUpdate(@NotNull 
ConfigurationNotificationEvent<TableView> ctx) {
+                    return CompletableFuture.completedFuture(null);
+                }
+            });
 
         this.defaultDataRegion = 
engine.createDataRegion(dataStorageCfg.defaultRegion());
 
         defaultDataRegion.start();
     }
 
     /** {@inheritDoc} */
-    @Override public void stop() {
-        for (TableStorage tableStorage : tableStorages.values()) {
+    @Override public synchronized void stop() {
+        synchronized (busyLock) {

Review comment:
       Fixed for both components.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to