sanpwc commented on a change in pull request #330:
URL: https://github.com/apache/ignite-3/pull/330#discussion_r709410163
##########
File path:
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
##########
@@ -566,52 +423,84 @@ private void dropTableLocally(String name, UUID tblId,
List<List<ClusterNode>> a
* {@code false} means the existing table will be returned.
* @return A table instance.
*/
- private CompletableFuture<Table> createTableAsync(String name,
Consumer<TableChange> tableInitChange, boolean exceptionWhenExist) {
+ private CompletableFuture<Table> createTableAsync(
+ String name,
+ Consumer<TableChange> tableInitChange,
+ boolean exceptionWhenExist
+ ) {
CompletableFuture<Table> tblFut = new CompletableFuture<>();
- EventListener<TableEventParameters> clo = new EventListener<>() {
- @Override public boolean notify(@NotNull TableEventParameters
parameters, @Nullable Throwable e) {
- String tableName = parameters.tableName();
-
- if (!name.equals(tableName))
- return false;
-
- if (e == null)
- tblFut.complete(parameters.table());
- else
- tblFut.completeExceptionally(e);
-
- return true;
- }
-
- @Override public void remove(@NotNull Throwable e) {
- tblFut.completeExceptionally(e);
- }
- };
-
- listen(TableEvent.CREATE, clo);
-
tableAsync(name, true).thenAccept(tbl -> {
if (tbl != null) {
if (exceptionWhenExist) {
- removeListener(TableEvent.CREATE, clo, new
IgniteInternalCheckedException(
- LoggerMessageHelper.format("Table already exists
[name={}]", name)));
- } else if (tblFut.complete(tbl))
- removeListener(TableEvent.CREATE, clo);
- } else {
- try {
- clusterCfgMgr
- .configurationRegistry()
- .getConfiguration(TablesConfiguration.KEY)
- .tables()
- .change(change -> change.create(name, tableInitChange))
- .get();
+ tblFut.completeExceptionally(new
IgniteInternalCheckedException(
+ LoggerMessageHelper.format("Table already exists
[name={}]", name)));
}
- catch (InterruptedException | ExecutionException e) {
- LOG.error("Table wasn't created [name=" + name + ']', e);
+ else
+ tblFut.complete(tbl);
+ }
+ else {
+ IgniteUuid tblId = TABLE_ID_GENERATOR.randomUuid();
- removeListener(TableEvent.CREATE, clo, new
IgniteInternalCheckedException(e));
- }
+ createTblIntention.put(tblId, new CompletableFuture<>());
+
+ clusterCfgMgr
+ .configurationRegistry()
+ .getConfiguration(TablesConfiguration.KEY)
+ .tables()
+ .change(
+ change -> change.create(
+ name,
+ (ch) -> {
+ tableInitChange.accept(ch);
+ ((ExtendedTableChange)ch).
+ // Table id specification.
+ changeId(tblId.toString()).
+ // Affinity assignments calculation.
+ changeAssignments(
+ ByteUtils.toBytes(
+
AffinityService.calculateAssignments(
+ baselineMgr.nodes(),
+ ch.partitions(),
+ ch.replicas()
+ )
+ )
+ ).
+ // Table schema preparation.
+ changeSchemas(
+ schemasCh -> schemasCh.create(
+
String.valueOf(INITIAL_SCHEMA_VERSION),
+ schemaCh -> schemaCh.changeSchema(
+ ByteUtils.toBytes(
+
SchemaService.prepareSchemaDescriptor(
+
((ExtendedTableView)ch).schemas().size(),
+ ch
+ )
+ )
+ )
+ )
+ );
+ }
+ )
+ )
+ .thenRun(() ->
createTblIntention.get(tblId).thenApply(tblFut::complete)
+ .thenRun(() -> createTblIntention.remove(tblId))
+ .exceptionally(throwable -> {
+ createTblIntention.remove(tblId);
+
+ tblFut.completeExceptionally(new
IgniteException(throwable));
+
+ return null;
+ }))
+ .exceptionally(throwable -> {
+ LOG.error("Table wasn't created [name=" + name + ']',
throwable);
Review comment:
Fixed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]