vldpyatkov commented on a change in pull request #399:
URL: https://github.com/apache/ignite-3/pull/399#discussion_r736425785
##########
File path:
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -199,144 +195,199 @@ public TableManager(
listenElements(new ConfigurationNamedListListener<TableView>() {
@Override
public @NotNull CompletableFuture<?> onCreate(@NotNull
ConfigurationNotificationEvent<TableView> ctx) {
- // Empty assignments might be a valid case if tables are
created from within cluster init HOCON
- // configuration, which is not supported now.
- assert ((ExtendedTableView)ctx.newValue()).assignments() !=
null :
- "Table =[" + ctx.newValue().name() + "] has empty
assignments.";
-
- final IgniteUuid tblId =
IgniteUuid.fromString(((ExtendedTableView)ctx.newValue()).id());
-
- // TODO: IGNITE-15409 Listener with any placeholder should be
used instead.
-
((ExtendedTableConfiguration)tablesCfg.tables().get(ctx.newValue().name())).schemas().
- listenElements(new ConfigurationNamedListListener<>() {
- @Override public @NotNull CompletableFuture<?>
onCreate(
- @NotNull
ConfigurationNotificationEvent<SchemaView> schemasCtx) {
- try {
-
((SchemaRegistryImpl)tables.get(ctx.newValue().name()).schemaView()).
- onSchemaRegistered(
-
SchemaSerializerImpl.INSTANCE.deserialize((schemasCtx.newValue().schema()))
- );
+ if (!busyLock.enterBusy()) {
+ String tblName = ctx.newValue().name();
+ IgniteUuid tblId =
IgniteUuid.fromString(((ExtendedTableView)ctx.newValue()).id());
+
+ fireEvent(TableEvent.CREATE,
+ new TableEventParameters(tblId, tblName),
+ new NodeStoppingException("Operation has been
cancelled (node is stopping)."));
+ }
+ try {
+ onTableCreateInternal(ctx);
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
- fireEvent(TableEvent.ALTER, new
TableEventParameters(tablesById.get(tblId)), null);
+ return CompletableFuture.completedFuture(null);
+ }
+
+ /**
+ * Method for handle a table configuration event.
+ *
+ * @param ctx Configuration event.
+ */
+ private void onTableCreateInternal(@NotNull
ConfigurationNotificationEvent<TableView> ctx) {
+ String tblName = ctx.newValue().name();
+ IgniteUuid tblId =
IgniteUuid.fromString(((ExtendedTableView)ctx.newValue()).id());
+
+ // Empty assignments might be a valid case if tables are
created from within cluster init HOCON
+ // configuration, which is not supported now.
+ assert ((ExtendedTableView)ctx.newValue()).assignments()
!= null :
+ LoggerMessageHelper.format("Table [id={}, name={}] has
empty assignments.", tblId, tblName);
+
+ // TODO: IGNITE-15409 Listener with any placeholder should
be used instead.
+
((ExtendedTableConfiguration)tablesCfg.tables().get(tblName)).schemas().
+ listenElements(new ConfigurationNamedListListener<>() {
+ @Override public @NotNull CompletableFuture<?>
onCreate(
+ @NotNull
ConfigurationNotificationEvent<SchemaView> schemasCtx) {
+ if (!busyLock.enterBusy()) {
+ fireEvent(TableEvent.ALTER, new
TableEventParameters(tblId, tblName),
+ new NodeStoppingException("Operation
has been cancelled (node is stopping)."));
+ }
+ try {
+
((SchemaRegistryImpl)tables.get(tblName).schemaView()).
+ onSchemaRegistered(
+
SchemaSerializerImpl.INSTANCE.deserialize((schemasCtx.newValue().schema()))
+ );
+
+ fireEvent(TableEvent.ALTER, new
TableEventParameters(tablesById.get(tblId)), null);
+ }
+ catch (Exception e) {
+ fireEvent(TableEvent.ALTER, new
TableEventParameters(tblId, tblName), e);
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+
+ return CompletableFuture.completedFuture(null);
}
- catch (Exception e) {
- fireEvent(TableEvent.ALTER, new
TableEventParameters(tblId, ctx.newValue().name()), e);
+
+ @Override public @NotNull CompletableFuture<?>
onRename(@NotNull String oldName,
+ @NotNull String newName,
+ @NotNull
ConfigurationNotificationEvent<SchemaView> ctx) {
+ return CompletableFuture.completedFuture(null);
}
- return CompletableFuture.completedFuture(null);
- }
+ @Override public @NotNull CompletableFuture<?>
onDelete(
+ @NotNull
ConfigurationNotificationEvent<SchemaView> ctx) {
+ return CompletableFuture.completedFuture(null);
+ }
- @Override
- public @NotNull CompletableFuture<?> onRename(@NotNull
String oldName, @NotNull String newName,
- @NotNull
ConfigurationNotificationEvent<SchemaView> ctx) {
- return CompletableFuture.completedFuture(null);
- }
+ @Override public @NotNull CompletableFuture<?>
onUpdate(
+ @NotNull
ConfigurationNotificationEvent<SchemaView> ctx) {
+ return CompletableFuture.completedFuture(null);
+ }
+ });
- @Override public @NotNull CompletableFuture<?>
onDelete(
- @NotNull
ConfigurationNotificationEvent<SchemaView> ctx) {
- return CompletableFuture.completedFuture(null);
- }
+
((ExtendedTableConfiguration)tablesCfg.tables().get(tblName)).assignments().
+ listen(assignmentsCtx -> {
+ List<List<ClusterNode>> oldAssignments =
+
(List<List<ClusterNode>>)ByteUtils.fromBytes(assignmentsCtx.oldValue());
- @Override public @NotNull CompletableFuture<?>
onUpdate(
- @NotNull
ConfigurationNotificationEvent<SchemaView> ctx) {
- return CompletableFuture.completedFuture(null);
- }
- });
+ List<List<ClusterNode>> newAssignments =
+
(List<List<ClusterNode>>)ByteUtils.fromBytes(assignmentsCtx.newValue());
+
+ CompletableFuture<?>[] futures = new
CompletableFuture<?>[oldAssignments.size()];
+
+ // TODO: IGNITE-15554 Add logic for assignment
recalculation in case of partitions or replicas changes
+ // TODO: Until IGNITE-15554 is implemented it's
safe to iterate over partitions and replicas cause there will
+ // TODO: be exact same amount of partitions and
replicas for both old and new assignments
+ for (int i = 0; i < oldAssignments.size(); i++) {
+ int partId = i;
+
+ List<ClusterNode> oldPartitionAssignment =
oldAssignments.get(partId);
+ List<ClusterNode> newPartitionAssignment =
newAssignments.get(partId);
-
((ExtendedTableConfiguration)tablesCfg.tables().get(ctx.newValue().name())).assignments().
- listen(assignmentsCtx -> {
- List<List<ClusterNode>> oldAssignments =
-
(List<List<ClusterNode>>)ByteUtils.fromBytes(assignmentsCtx.oldValue());
-
- List<List<ClusterNode>> newAssignments =
-
(List<List<ClusterNode>>)ByteUtils.fromBytes(assignmentsCtx.newValue());
-
- CompletableFuture<?>[] futures = new
CompletableFuture<?>[oldAssignments.size()];
-
- // TODO: IGNITE-15554 Add logic for assignment
recalculation in case of partitions or replicas changes
- // TODO: Until IGNITE-15554 is implemented it's safe
to iterate over partitions and replicas cause there will
- // TODO: be exact same amount of partitions and
replicas for both old and new assignments
- for (int i = 0; i < oldAssignments.size(); i++) {
- final int p = i;
-
- List<ClusterNode> oldPartitionAssignment =
oldAssignments.get(p);
- List<ClusterNode> newPartitionAssignment =
newAssignments.get(p);
-
- var toAdd = new HashSet<>(newPartitionAssignment);
- var toRemove = new
HashSet<>(oldPartitionAssignment);
-
- toAdd.removeAll(oldPartitionAssignment);
- toRemove.removeAll(newPartitionAssignment);
-
- // Create new raft nodes according to new
assignments.
- futures[i] = raftMgr.updateRaftGroup(
- raftGroupName(tblId, p),
- newPartitionAssignment,
- toAdd,
- () -> new
PartitionListener(tableStorages.get(tblId).getOrCreatePartition(p))
- )
- .thenAccept(
- updatedRaftGroupService ->
tables.get(ctx.newValue().name()).updateInternalTableRaftGroupService(p,
updatedRaftGroupService)
- ).thenRun(() ->
raftMgr.stopRaftGroup(raftGroupName(tblId, p), new ArrayList<>(toRemove))
+ var toAdd = new
HashSet<>(newPartitionAssignment);
+ var toRemove = new
HashSet<>(oldPartitionAssignment);
+
+ toAdd.removeAll(oldPartitionAssignment);
+ toRemove.removeAll(newPartitionAssignment);
+
+ InternalTable internalTable =
tablesById.get(tblId).internalTable();
+
+ // Create new raft nodes according to new
assignments.
+ futures[i] = raftMgr.prepareRaftGroup(
+ raftGroupName(tblId, partId),
+ newPartitionAssignment,
+ () -> new
PartitionListener(internalTable.storage().getOrCreatePartition(partId))
+ ).thenAccept(
+ updatedRaftGroupService ->
internalTable.updateInternalTableRaftGroupService(partId,
updatedRaftGroupService)
).exceptionally(th -> {
LOG.error("Failed to update raft
groups one the node", th);
+
return null;
}
);
- }
+ }
- return CompletableFuture.allOf(futures);
- });
+ return CompletableFuture.allOf(futures);
+ });
- createTableLocally(
- ctx.newValue().name(),
-
IgniteUuid.fromString(((ExtendedTableView)ctx.newValue()).id()),
-
(List<List<ClusterNode>>)ByteUtils.fromBytes(((ExtendedTableView)ctx.newValue()).assignments()),
-
SchemaSerializerImpl.INSTANCE.deserialize(((ExtendedTableView)ctx.newValue()).schemas().
- get(String.valueOf(INITIAL_SCHEMA_VERSION)).schema())
- );
+ createTableLocally(
+ tblName,
+ tblId,
+
(List<List<ClusterNode>>)ByteUtils.fromBytes(((ExtendedTableView)ctx.newValue()).assignments()),
+
SchemaSerializerImpl.INSTANCE.deserialize(((ExtendedTableView)ctx.newValue()).schemas().
+
get(String.valueOf(INITIAL_SCHEMA_VERSION)).schema())
+ );
+ }
- return CompletableFuture.completedFuture(null);
- }
+ @Override
+ public @NotNull CompletableFuture<?> onRename(@NotNull String
oldName, @NotNull String newName,
+ @NotNull ConfigurationNotificationEvent<TableView> ctx) {
+ // TODO: IGNITE-15485 Support table rename operation.
- @Override public @NotNull CompletableFuture<?> onRename(@NotNull
String oldName, @NotNull String newName,
- @NotNull ConfigurationNotificationEvent<TableView> ctx) {
- // TODO: IGNITE-15485 Support table rename operation.
+ return CompletableFuture.completedFuture(null);
+ }
- return CompletableFuture.completedFuture(null);
- }
+ @Override public @NotNull CompletableFuture<?> onDelete(
+ @NotNull ConfigurationNotificationEvent<TableView> ctx
+ ) {
+ if (!busyLock.enterBusy()) {
+ String tblName = ctx.oldValue().name();
+ IgniteUuid tblId =
IgniteUuid.fromString(((ExtendedTableView)ctx.oldValue()).id());
- @Override public @NotNull CompletableFuture<?> onDelete(
- @NotNull ConfigurationNotificationEvent<TableView> ctx
- ) {
- dropTableLocally(
- ctx.oldValue().name(),
-
IgniteUuid.fromString(((ExtendedTableView)ctx.oldValue()).id()),
-
(List<List<ClusterNode>>)ByteUtils.fromBytes(((ExtendedTableView)ctx.oldValue()).assignments())
- );
+ fireEvent(TableEvent.DROP, new
TableEventParameters(tblId, tblName),
+ new NodeStoppingException("Operation has been
cancelled (node is stopping)."));
+ }
+ try {
+ dropTableLocally(
+ ctx.oldValue().name(),
+
IgniteUuid.fromString(((ExtendedTableView)ctx.oldValue()).id()),
+
(List<List<ClusterNode>>)ByteUtils.fromBytes(((ExtendedTableView)ctx.oldValue()).assignments())
+ );
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
- return CompletableFuture.completedFuture(null);
- }
+ return CompletableFuture.completedFuture(null);
+ }
- @Override
- public @NotNull CompletableFuture<?> onUpdate(@NotNull
ConfigurationNotificationEvent<TableView> ctx) {
- return CompletableFuture.completedFuture(null);
- }
- });
+ @Override
+ public @NotNull CompletableFuture<?> onUpdate(@NotNull
ConfigurationNotificationEvent<TableView> ctx) {
+ return CompletableFuture.completedFuture(null);
+ }
+ });
this.defaultDataRegion =
engine.createDataRegion(dataStorageCfg.defaultRegion());
defaultDataRegion.start();
}
/** {@inheritDoc} */
- @Override public void stop() {
- for (TableStorage tableStorage : tableStorages.values()) {
+ @Override public synchronized void stop() {
+ synchronized (busyLock) {
+ if (!busyLock.enterBusy())
+ return;
+
+ busyLock.leaveBusy();
+
+ busyLock.block();
+ }
+
+ for (TableImpl table : tables.values()) {
try {
- tableStorage.stop();
+ table.close();
Review comment:
This code is changed, tables aren't closing in Table managed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]