denis-chudov commented on a change in pull request #704:
URL: https://github.com/apache/ignite-3/pull/704#discussion_r820099130
##########
File path:
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -465,13 +487,8 @@ private void createTableLocally(
long causalityToken,
String name,
UUID tblId,
- List<List<ClusterNode>> assignment,
Review comment:
there is an old javadoc for assignments
##########
File path:
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -199,212 +199,161 @@ public TableManager(
/** {@inheritDoc} */
@Override
public void start() {
- tablesCfg.tables()
- .listenElements(new ConfigurationNamedListListener<>() {
- @Override
- public CompletableFuture<?>
onCreate(ConfigurationNotificationEvent<TableView> ctx) {
- if (!busyLock.enterBusy()) {
- String tblName = ctx.newValue().name();
- UUID tblId = ((ExtendedTableView)
ctx.newValue()).id();
-
- fireEvent(TableEvent.CREATE,
- new
TableEventParameters(ctx.storageRevision(), tblId, tblName),
- new NodeStoppingException()
- );
-
- return CompletableFuture.failedFuture(new
NodeStoppingException());
- }
+ ((ExtendedTableConfiguration)
tablesCfg.tables().any()).schemas().listenElements(new
ConfigurationNamedListListener<>() {
+ @Override
+ public CompletableFuture<?>
onCreate(ConfigurationNotificationEvent<SchemaView> schemasCtx) {
- try {
- onTableCreateInternal(ctx);
- } finally {
- busyLock.leaveBusy();
- }
+ long causalityToken = schemasCtx.storageRevision();
- return CompletableFuture.completedFuture(null);
- }
+ ExtendedTableConfiguration tblCfg =
(ExtendedTableConfiguration) schemasCtx.config(TableConfiguration.class);
- /**
- * Method for handle a table configuration event.
- *
- * @param ctx Configuration event.
- */
- private void
onTableCreateInternal(ConfigurationNotificationEvent<TableView> ctx) {
- String tblName = ctx.newValue().name();
- UUID tblId = ((ExtendedTableView) ctx.newValue()).id();
-
- // Empty assignments might be a valid case if tables
are created from within cluster init HOCON
- // configuration, which is not supported now.
- assert ((ExtendedTableView)
ctx.newValue()).assignments() != null :
- IgniteStringFormatter.format("Table [id={},
name={}] has empty assignments.", tblId, tblName);
-
- // TODO: IGNITE-16369 Listener with any placeholder
should be used instead.
- ((ExtendedTableConfiguration)
tablesCfg.tables().get(tblName)).schemas()
- .listenElements(new
ConfigurationNamedListListener<>() {
- @Override
- public CompletableFuture<?>
onCreate(ConfigurationNotificationEvent<SchemaView> schemasCtx) {
- long causalityToken =
schemasCtx.storageRevision();
-
- if (!busyLock.enterBusy()) {
- fireEvent(
- TableEvent.ALTER,
- new
TableEventParameters(causalityToken, tblId, tblName),
- new NodeStoppingException()
- );
-
- return
CompletableFuture.failedFuture(new NodeStoppingException());
- }
+ UUID tblId = tblCfg.id().value();
- try {
- // Avoid calling listener
immediately after the listener completes to create the current table.
- // FIXME:
https://issues.apache.org/jira/browse/IGNITE-16369
- if (ctx.storageRevision() !=
schemasCtx.storageRevision()) {
- return
tablesByIdVv.get(causalityToken).thenAccept(tablesById -> {
- TableImpl table =
tablesById.get(tblId);
+ String tblName = tblCfg.name().value();
- ((SchemaRegistryImpl)
table.schemaView())
-
.onSchemaRegistered(
-
SchemaSerializerImpl.INSTANCE.deserialize(
-
(schemasCtx.newValue().schema())));
+ SchemaDescriptor schemaDescriptor =
SchemaSerializerImpl.INSTANCE.deserialize((schemasCtx.newValue().schema()));
-
fireEvent(TableEvent.ALTER, new TableEventParameters(causalityToken,
- table), null);
+ if (!busyLock.enterBusy()) {
+ if (schemaDescriptor.version() != INITIAL_SCHEMA_VERSION) {
+ fireEvent(
+ TableEvent.ALTER,
+ new TableEventParameters(causalityToken,
tblId, tblName),
+ new NodeStoppingException()
+ );
+ }
- });
- }
+ return CompletableFuture.failedFuture(new
NodeStoppingException());
+ }
- return
CompletableFuture.completedFuture(null);
- } catch (Exception e) {
- fireEvent(TableEvent.ALTER, new
TableEventParameters(causalityToken, tblId,
- tblName), e);
+ try {
+ tablesByIdVv.update(causalityToken, tablesById -> {
+ TableImpl table = tablesById.get(tblId);
- return
CompletableFuture.failedFuture(e);
- } finally {
- busyLock.leaveBusy();
- }
- }
- });
-
- ((ExtendedTableConfiguration)
tablesCfg.tables().get(tblName)).assignments()
- .listen(assignmentsCtx -> {
- if (!busyLock.enterBusy()) {
- return
CompletableFuture.failedFuture(new NodeStoppingException());
- }
-
- try {
- // Avoid calling listener immediately
after the listener completes to create the current table.
- // FIXME:
https://issues.apache.org/jira/browse/IGNITE-16369
- if (ctx.storageRevision() ==
assignmentsCtx.storageRevision()) {
- return
CompletableFuture.completedFuture(null);
- } else {
- return
updateAssignmentInternal(assignmentsCtx.storageRevision(), tblId,
assignmentsCtx);
- }
- } finally {
- busyLock.leaveBusy();
- }
- });
-
- createTableLocally(
- ctx.storageRevision(),
- tblName,
- tblId,
- (List<List<ClusterNode>>)
ByteUtils.fromBytes(((ExtendedTableView) ctx.newValue()).assignments()),
-
SchemaSerializerImpl.INSTANCE.deserialize(((ExtendedTableView)
ctx.newValue()).schemas()
-
.get(String.valueOf(INITIAL_SCHEMA_VERSION)).schema())
- );
- }
+ ((SchemaRegistryImpl)
table.schemaView()).onSchemaRegistered(schemaDescriptor);
- private CompletableFuture<?> updateAssignmentInternal(
- long causalityToken,
- UUID tblId,
- ConfigurationNotificationEvent<byte[]>
assignmentsCtx
- ) {
- List<List<ClusterNode>> oldAssignments =
- (List<List<ClusterNode>>)
ByteUtils.fromBytes(assignmentsCtx.oldValue());
-
- List<List<ClusterNode>> newAssignments =
- (List<List<ClusterNode>>)
ByteUtils.fromBytes(assignmentsCtx.newValue());
-
- CompletableFuture<?>[] futures = new
CompletableFuture<?>[oldAssignments.size()];
-
- // TODO: IGNITE-15554 Add logic for assignment
recalculation in case of partitions or replicas changes
- // TODO: Until IGNITE-15554 is implemented it's safe
to iterate over partitions and replicas cause there will
- // TODO: be exact same amount of partitions and
replicas for both old and new assignments
- for (int i = 0; i < oldAssignments.size(); i++) {
- int partId = i;
-
- List<ClusterNode> oldPartitionAssignment =
oldAssignments.get(partId);
- List<ClusterNode> newPartitionAssignment =
newAssignments.get(partId);
-
- var toAdd = new HashSet<>(newPartitionAssignment);
-
- toAdd.removeAll(oldPartitionAssignment);
-
- // Create new raft nodes according to new
assignments.
- futures[i] =
tablesByIdVv.get(causalityToken).thenCompose(tablesById -> {
- InternalTable internalTable =
tablesById.get(tblId).internalTable();
-
- try {
- return raftMgr.updateRaftGroup(
- raftGroupName(tblId, partId),
- newPartitionAssignment,
- toAdd,
- () -> new PartitionListener(tblId,
- new
VersionedRowStore(internalTable.storage().getOrCreatePartition(partId),
- txManager))
- ).thenAccept(
- updatedRaftGroupService ->
((InternalTableImpl) internalTable)
-
.updateInternalTableRaftGroupService(partId, updatedRaftGroupService)
- ).exceptionally(th -> {
- LOG.error("Failed to update raft
groups one the node", th);
-
- return null;
- });
- } catch (NodeStoppingException e) {
- throw new AssertionError("Loza was stopped
before Table manager", e);
- }
- });
+ if (schemaDescriptor.version() !=
INITIAL_SCHEMA_VERSION) {
+ fireEvent(TableEvent.ALTER, new
TableEventParameters(causalityToken, table), null);
}
- return CompletableFuture.allOf(futures);
+ return tablesById;
+ }, th -> {
+ throw new
IgniteInternalException(IgniteStringFormatter.format("Cannot create a schema
for table"
+ + " [tableId={}, schemaVer={}]", tblId,
schemaDescriptor.version()), th);
+ });
+
+ return CompletableFuture.completedFuture(null);
+ } catch (Exception e) {
+ if (schemaDescriptor.version() != INITIAL_SCHEMA_VERSION) {
+ fireEvent(TableEvent.ALTER, new
TableEventParameters(causalityToken, tblId, tblName), e);
}
- @Override
- public CompletableFuture<?> onRename(String oldName,
String newName, ConfigurationNotificationEvent<TableView> ctx) {
- // TODO: IGNITE-15485 Support table rename operation.
+ return CompletableFuture.failedFuture(e);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+ });
- return CompletableFuture.completedFuture(null);
- }
+ ((ExtendedTableConfiguration)
tablesCfg.tables().any()).assignments().listen(assignmentsCtx -> {
+ long causalityToken = assignmentsCtx.storageRevision();
- @Override
- public CompletableFuture<?>
onDelete(ConfigurationNotificationEvent<TableView> ctx) {
- if (!busyLock.enterBusy()) {
- String tblName = ctx.oldValue().name();
- UUID tblId = ((ExtendedTableView)
ctx.oldValue()).id();
+ ExtendedTableConfiguration tblCfg = (ExtendedTableConfiguration)
assignmentsCtx.config(TableConfiguration.class);
- fireEvent(
- TableEvent.DROP,
- new
TableEventParameters(ctx.storageRevision(), tblId, tblName),
- new NodeStoppingException()
- );
+ UUID tblId = tblCfg.id().value();
- return CompletableFuture.failedFuture(new
NodeStoppingException());
- }
+ if (!busyLock.enterBusy()) {
+ return CompletableFuture.failedFuture(new
NodeStoppingException());
+ }
- try {
- dropTableLocally(
- ctx.storageRevision(),
- ctx.oldValue().name(),
- ((ExtendedTableView) ctx.oldValue()).id(),
- (List<List<ClusterNode>>)
ByteUtils.fromBytes(((ExtendedTableView) ctx.oldValue()).assignments())
- );
- } finally {
- busyLock.leaveBusy();
- }
+ try {
+ return updateAssignmentInternal(causalityToken, tblId,
assignmentsCtx);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ });
- return CompletableFuture.completedFuture(null);
- }
- });
+ tablesCfg.tables().listenElements(new
ConfigurationNamedListListener<>() {
+ @Override
+ public CompletableFuture<?>
onCreate(ConfigurationNotificationEvent<TableView> ctx) {
+ if (!busyLock.enterBusy()) {
+ String tblName = ctx.newValue().name();
+ UUID tblId = ((ExtendedTableView) ctx.newValue()).id();
+
+ fireEvent(TableEvent.CREATE,
+ new TableEventParameters(ctx.storageRevision(),
tblId, tblName),
+ new NodeStoppingException()
+ );
+
+ return CompletableFuture.failedFuture(new
NodeStoppingException());
+ }
+
+ try {
+ onTableCreateInternal(ctx);
+ } finally {
+ busyLock.leaveBusy();
+ }
+
+ return CompletableFuture.completedFuture(null);
+ }
+
+ /**
+ * Method for handle a table configuration event.
+ *
+ * @param ctx Configuration event.
+ */
+ private void
onTableCreateInternal(ConfigurationNotificationEvent<TableView> ctx) {
+ String tblName = ctx.newValue().name();
+ UUID tblId = ((ExtendedTableView) ctx.newValue()).id();
+
+ // Empty assignments might be a valid case if tables are
created from within cluster init HOCON
+ // configuration, which is not supported now.
+ assert ((ExtendedTableView) ctx.newValue()).assignments() !=
null :
+ IgniteStringFormatter.format("Table [id={}, name={}]
has empty assignments.", tblId, tblName);
+
+ createTableLocally(
+ ctx.storageRevision(),
+ tblName,
+ tblId,
+ ((ExtendedTableView) ctx.newValue()).partitions()
Review comment:
excessive cast
##########
File path:
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -199,212 +199,161 @@ public TableManager(
/** {@inheritDoc} */
@Override
public void start() {
- tablesCfg.tables()
- .listenElements(new ConfigurationNamedListListener<>() {
- @Override
- public CompletableFuture<?>
onCreate(ConfigurationNotificationEvent<TableView> ctx) {
- if (!busyLock.enterBusy()) {
- String tblName = ctx.newValue().name();
- UUID tblId = ((ExtendedTableView)
ctx.newValue()).id();
-
- fireEvent(TableEvent.CREATE,
- new
TableEventParameters(ctx.storageRevision(), tblId, tblName),
- new NodeStoppingException()
- );
-
- return CompletableFuture.failedFuture(new
NodeStoppingException());
- }
+ ((ExtendedTableConfiguration)
tablesCfg.tables().any()).schemas().listenElements(new
ConfigurationNamedListListener<>() {
+ @Override
+ public CompletableFuture<?>
onCreate(ConfigurationNotificationEvent<SchemaView> schemasCtx) {
- try {
- onTableCreateInternal(ctx);
- } finally {
- busyLock.leaveBusy();
- }
+ long causalityToken = schemasCtx.storageRevision();
- return CompletableFuture.completedFuture(null);
- }
+ ExtendedTableConfiguration tblCfg =
(ExtendedTableConfiguration) schemasCtx.config(TableConfiguration.class);
- /**
- * Method for handle a table configuration event.
- *
- * @param ctx Configuration event.
- */
- private void
onTableCreateInternal(ConfigurationNotificationEvent<TableView> ctx) {
- String tblName = ctx.newValue().name();
- UUID tblId = ((ExtendedTableView) ctx.newValue()).id();
-
- // Empty assignments might be a valid case if tables
are created from within cluster init HOCON
- // configuration, which is not supported now.
- assert ((ExtendedTableView)
ctx.newValue()).assignments() != null :
- IgniteStringFormatter.format("Table [id={},
name={}] has empty assignments.", tblId, tblName);
-
- // TODO: IGNITE-16369 Listener with any placeholder
should be used instead.
- ((ExtendedTableConfiguration)
tablesCfg.tables().get(tblName)).schemas()
- .listenElements(new
ConfigurationNamedListListener<>() {
- @Override
- public CompletableFuture<?>
onCreate(ConfigurationNotificationEvent<SchemaView> schemasCtx) {
- long causalityToken =
schemasCtx.storageRevision();
-
- if (!busyLock.enterBusy()) {
- fireEvent(
- TableEvent.ALTER,
- new
TableEventParameters(causalityToken, tblId, tblName),
- new NodeStoppingException()
- );
-
- return
CompletableFuture.failedFuture(new NodeStoppingException());
- }
+ UUID tblId = tblCfg.id().value();
- try {
- // Avoid calling listener
immediately after the listener completes to create the current table.
- // FIXME:
https://issues.apache.org/jira/browse/IGNITE-16369
- if (ctx.storageRevision() !=
schemasCtx.storageRevision()) {
- return
tablesByIdVv.get(causalityToken).thenAccept(tablesById -> {
- TableImpl table =
tablesById.get(tblId);
+ String tblName = tblCfg.name().value();
- ((SchemaRegistryImpl)
table.schemaView())
-
.onSchemaRegistered(
-
SchemaSerializerImpl.INSTANCE.deserialize(
-
(schemasCtx.newValue().schema())));
+ SchemaDescriptor schemaDescriptor =
SchemaSerializerImpl.INSTANCE.deserialize((schemasCtx.newValue().schema()));
-
fireEvent(TableEvent.ALTER, new TableEventParameters(causalityToken,
- table), null);
+ if (!busyLock.enterBusy()) {
+ if (schemaDescriptor.version() != INITIAL_SCHEMA_VERSION) {
+ fireEvent(
+ TableEvent.ALTER,
+ new TableEventParameters(causalityToken,
tblId, tblName),
+ new NodeStoppingException()
+ );
Review comment:
why don't we need to wait for versioned value updates?
##########
File path:
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -199,212 +199,161 @@ public TableManager(
/** {@inheritDoc} */
@Override
public void start() {
- tablesCfg.tables()
- .listenElements(new ConfigurationNamedListListener<>() {
- @Override
- public CompletableFuture<?>
onCreate(ConfigurationNotificationEvent<TableView> ctx) {
- if (!busyLock.enterBusy()) {
- String tblName = ctx.newValue().name();
- UUID tblId = ((ExtendedTableView)
ctx.newValue()).id();
-
- fireEvent(TableEvent.CREATE,
- new
TableEventParameters(ctx.storageRevision(), tblId, tblName),
- new NodeStoppingException()
- );
-
- return CompletableFuture.failedFuture(new
NodeStoppingException());
- }
+ ((ExtendedTableConfiguration)
tablesCfg.tables().any()).schemas().listenElements(new
ConfigurationNamedListListener<>() {
+ @Override
+ public CompletableFuture<?>
onCreate(ConfigurationNotificationEvent<SchemaView> schemasCtx) {
- try {
- onTableCreateInternal(ctx);
- } finally {
- busyLock.leaveBusy();
- }
+ long causalityToken = schemasCtx.storageRevision();
- return CompletableFuture.completedFuture(null);
- }
+ ExtendedTableConfiguration tblCfg =
(ExtendedTableConfiguration) schemasCtx.config(TableConfiguration.class);
Review comment:
seems the cast is excessive
##########
File path:
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -199,212 +199,161 @@ public TableManager(
/** {@inheritDoc} */
@Override
public void start() {
- tablesCfg.tables()
- .listenElements(new ConfigurationNamedListListener<>() {
- @Override
- public CompletableFuture<?>
onCreate(ConfigurationNotificationEvent<TableView> ctx) {
- if (!busyLock.enterBusy()) {
- String tblName = ctx.newValue().name();
- UUID tblId = ((ExtendedTableView)
ctx.newValue()).id();
-
- fireEvent(TableEvent.CREATE,
- new
TableEventParameters(ctx.storageRevision(), tblId, tblName),
- new NodeStoppingException()
- );
-
- return CompletableFuture.failedFuture(new
NodeStoppingException());
- }
+ ((ExtendedTableConfiguration)
tablesCfg.tables().any()).schemas().listenElements(new
ConfigurationNamedListListener<>() {
+ @Override
+ public CompletableFuture<?>
onCreate(ConfigurationNotificationEvent<SchemaView> schemasCtx) {
- try {
- onTableCreateInternal(ctx);
- } finally {
- busyLock.leaveBusy();
- }
+ long causalityToken = schemasCtx.storageRevision();
- return CompletableFuture.completedFuture(null);
- }
+ ExtendedTableConfiguration tblCfg =
(ExtendedTableConfiguration) schemasCtx.config(TableConfiguration.class);
- /**
- * Method for handle a table configuration event.
- *
- * @param ctx Configuration event.
- */
- private void
onTableCreateInternal(ConfigurationNotificationEvent<TableView> ctx) {
- String tblName = ctx.newValue().name();
- UUID tblId = ((ExtendedTableView) ctx.newValue()).id();
-
- // Empty assignments might be a valid case if tables
are created from within cluster init HOCON
- // configuration, which is not supported now.
- assert ((ExtendedTableView)
ctx.newValue()).assignments() != null :
- IgniteStringFormatter.format("Table [id={},
name={}] has empty assignments.", tblId, tblName);
-
- // TODO: IGNITE-16369 Listener with any placeholder
should be used instead.
- ((ExtendedTableConfiguration)
tablesCfg.tables().get(tblName)).schemas()
- .listenElements(new
ConfigurationNamedListListener<>() {
- @Override
- public CompletableFuture<?>
onCreate(ConfigurationNotificationEvent<SchemaView> schemasCtx) {
- long causalityToken =
schemasCtx.storageRevision();
-
- if (!busyLock.enterBusy()) {
- fireEvent(
- TableEvent.ALTER,
- new
TableEventParameters(causalityToken, tblId, tblName),
- new NodeStoppingException()
- );
-
- return
CompletableFuture.failedFuture(new NodeStoppingException());
- }
+ UUID tblId = tblCfg.id().value();
- try {
- // Avoid calling listener
immediately after the listener completes to create the current table.
- // FIXME:
https://issues.apache.org/jira/browse/IGNITE-16369
- if (ctx.storageRevision() !=
schemasCtx.storageRevision()) {
- return
tablesByIdVv.get(causalityToken).thenAccept(tablesById -> {
- TableImpl table =
tablesById.get(tblId);
+ String tblName = tblCfg.name().value();
- ((SchemaRegistryImpl)
table.schemaView())
-
.onSchemaRegistered(
-
SchemaSerializerImpl.INSTANCE.deserialize(
-
(schemasCtx.newValue().schema())));
+ SchemaDescriptor schemaDescriptor =
SchemaSerializerImpl.INSTANCE.deserialize((schemasCtx.newValue().schema()));
-
fireEvent(TableEvent.ALTER, new TableEventParameters(causalityToken,
- table), null);
+ if (!busyLock.enterBusy()) {
+ if (schemaDescriptor.version() != INITIAL_SCHEMA_VERSION) {
+ fireEvent(
+ TableEvent.ALTER,
+ new TableEventParameters(causalityToken,
tblId, tblName),
+ new NodeStoppingException()
+ );
+ }
- });
- }
+ return CompletableFuture.failedFuture(new
NodeStoppingException());
+ }
- return
CompletableFuture.completedFuture(null);
- } catch (Exception e) {
- fireEvent(TableEvent.ALTER, new
TableEventParameters(causalityToken, tblId,
- tblName), e);
+ try {
+ tablesByIdVv.update(causalityToken, tablesById -> {
+ TableImpl table = tablesById.get(tblId);
- return
CompletableFuture.failedFuture(e);
- } finally {
- busyLock.leaveBusy();
- }
- }
- });
-
- ((ExtendedTableConfiguration)
tablesCfg.tables().get(tblName)).assignments()
- .listen(assignmentsCtx -> {
- if (!busyLock.enterBusy()) {
- return
CompletableFuture.failedFuture(new NodeStoppingException());
- }
-
- try {
- // Avoid calling listener immediately
after the listener completes to create the current table.
- // FIXME:
https://issues.apache.org/jira/browse/IGNITE-16369
- if (ctx.storageRevision() ==
assignmentsCtx.storageRevision()) {
- return
CompletableFuture.completedFuture(null);
- } else {
- return
updateAssignmentInternal(assignmentsCtx.storageRevision(), tblId,
assignmentsCtx);
- }
- } finally {
- busyLock.leaveBusy();
- }
- });
-
- createTableLocally(
- ctx.storageRevision(),
- tblName,
- tblId,
- (List<List<ClusterNode>>)
ByteUtils.fromBytes(((ExtendedTableView) ctx.newValue()).assignments()),
-
SchemaSerializerImpl.INSTANCE.deserialize(((ExtendedTableView)
ctx.newValue()).schemas()
-
.get(String.valueOf(INITIAL_SCHEMA_VERSION)).schema())
- );
- }
+ ((SchemaRegistryImpl)
table.schemaView()).onSchemaRegistered(schemaDescriptor);
- private CompletableFuture<?> updateAssignmentInternal(
- long causalityToken,
- UUID tblId,
- ConfigurationNotificationEvent<byte[]>
assignmentsCtx
- ) {
- List<List<ClusterNode>> oldAssignments =
- (List<List<ClusterNode>>)
ByteUtils.fromBytes(assignmentsCtx.oldValue());
-
- List<List<ClusterNode>> newAssignments =
- (List<List<ClusterNode>>)
ByteUtils.fromBytes(assignmentsCtx.newValue());
-
- CompletableFuture<?>[] futures = new
CompletableFuture<?>[oldAssignments.size()];
-
- // TODO: IGNITE-15554 Add logic for assignment
recalculation in case of partitions or replicas changes
- // TODO: Until IGNITE-15554 is implemented it's safe
to iterate over partitions and replicas cause there will
- // TODO: be exact same amount of partitions and
replicas for both old and new assignments
- for (int i = 0; i < oldAssignments.size(); i++) {
- int partId = i;
-
- List<ClusterNode> oldPartitionAssignment =
oldAssignments.get(partId);
- List<ClusterNode> newPartitionAssignment =
newAssignments.get(partId);
-
- var toAdd = new HashSet<>(newPartitionAssignment);
-
- toAdd.removeAll(oldPartitionAssignment);
-
- // Create new raft nodes according to new
assignments.
- futures[i] =
tablesByIdVv.get(causalityToken).thenCompose(tablesById -> {
- InternalTable internalTable =
tablesById.get(tblId).internalTable();
-
- try {
- return raftMgr.updateRaftGroup(
- raftGroupName(tblId, partId),
- newPartitionAssignment,
- toAdd,
- () -> new PartitionListener(tblId,
- new
VersionedRowStore(internalTable.storage().getOrCreatePartition(partId),
- txManager))
- ).thenAccept(
- updatedRaftGroupService ->
((InternalTableImpl) internalTable)
-
.updateInternalTableRaftGroupService(partId, updatedRaftGroupService)
- ).exceptionally(th -> {
- LOG.error("Failed to update raft
groups one the node", th);
-
- return null;
- });
- } catch (NodeStoppingException e) {
- throw new AssertionError("Loza was stopped
before Table manager", e);
- }
- });
+ if (schemaDescriptor.version() !=
INITIAL_SCHEMA_VERSION) {
+ fireEvent(TableEvent.ALTER, new
TableEventParameters(causalityToken, table), null);
}
- return CompletableFuture.allOf(futures);
+ return tablesById;
+ }, th -> {
+ throw new
IgniteInternalException(IgniteStringFormatter.format("Cannot create a schema
for table"
+ + " [tableId={}, schemaVer={}]", tblId,
schemaDescriptor.version()), th);
+ });
+
+ return CompletableFuture.completedFuture(null);
+ } catch (Exception e) {
+ if (schemaDescriptor.version() != INITIAL_SCHEMA_VERSION) {
+ fireEvent(TableEvent.ALTER, new
TableEventParameters(causalityToken, tblId, tblName), e);
}
- @Override
- public CompletableFuture<?> onRename(String oldName,
String newName, ConfigurationNotificationEvent<TableView> ctx) {
- // TODO: IGNITE-15485 Support table rename operation.
+ return CompletableFuture.failedFuture(e);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+ });
- return CompletableFuture.completedFuture(null);
- }
+ ((ExtendedTableConfiguration)
tablesCfg.tables().any()).assignments().listen(assignmentsCtx -> {
+ long causalityToken = assignmentsCtx.storageRevision();
- @Override
- public CompletableFuture<?>
onDelete(ConfigurationNotificationEvent<TableView> ctx) {
- if (!busyLock.enterBusy()) {
- String tblName = ctx.oldValue().name();
- UUID tblId = ((ExtendedTableView)
ctx.oldValue()).id();
+ ExtendedTableConfiguration tblCfg = (ExtendedTableConfiguration)
assignmentsCtx.config(TableConfiguration.class);
Review comment:
excessive cast
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]