denis-chudov commented on code in PR #977:
URL: https://github.com/apache/ignite-3/pull/977#discussion_r940093757


##########
modules/core/src/main/java/org/apache/ignite/internal/causality/VersionedValue.java:
##########
@@ -224,6 +227,15 @@ public T latest() {
         return getDefault();
     }
 
+    /**
+     * Waits for the latest value of a future.
+     */
+    public T waitForLatest(long timeout, TimeUnit unit) throws 
ExecutionException, InterruptedException, TimeoutException {

Review Comment:
   I think it's worthy describing parameters and exceptions in javadoc.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1063,17 +1107,33 @@ public CompletableFuture<List<Table>> tablesAsync() {
      */
     private CompletableFuture<List<Table>> tablesAsyncInternal() {
         // TODO: IGNITE-16288 directTableIds should use async configuration API
-        return CompletableFuture.supplyAsync(this::directTableIds)
-                .thenCompose(tableIds -> {
-                    var tableFuts = new CompletableFuture[tableIds.size()];
+        return CompletableFuture.supplyAsync(() -> {
+            if (!busyLock.enterBusy()) {
+                throw new IgniteException(NODE_STOPPING_ERR, new 
NodeStoppingException());
+            }
+            try {
+                return directTableIds();
+            } finally {
+                busyLock.leaveBusy();
+            }
+        }).thenCompose(tableIds -> {
+            if (!busyLock.enterBusy()) {
+                throw new IgniteException(NODE_STOPPING_ERR, new 
NodeStoppingException());
+            }
+            try {
+                var tableFuts = new CompletableFuture[tableIds.size()];

Review Comment:
   shouldn't we make some wrapper, like `withBusyLock(busyLock, lambda)` - 
maybe it will make the code more compact



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -598,7 +622,23 @@ public void stop() {
 
         busyLock.block();
 
-        Map<UUID, TableImpl> tables = tablesByIdVv.latest();
+        Map<UUID, TableImpl> tables;
+
+        try {
+            // Waiting for tablesByIdVv generally can be unbounded, because we 
don't have time limits for receiving storage revision update,
+            // where tablesByIdVv is completed, so we add timeout here.
+            tables = tablesByIdVv.waitForLatest(TABLES_COMPLETE_TIMEOUT, 
TimeUnit.SECONDS);
+        } catch (ExecutionException | InterruptedException | TimeoutException 
e) {
+            throw new IgniteException(TABLE_NOT_COMPLETED_ERR);
+        }
+
+        cleanUpTablesResources(tables);

Review Comment:
   shouldnt we save some context of the tables that are being created, we 
already have the futures for them (beforeTablesVvComplete) - they can be 
enriched



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java:
##########
@@ -87,20 +94,33 @@ public SqlSchemaManagerImpl(
         });
 
         schemasVv.whenComplete((token, stringIgniteSchemaMap, throwable) -> {
-            if (throwable != null) {
+            if (!busyLock.enterBusy()) {
                 calciteSchemaVv.completeExceptionally(
                         token,
-                        new IgniteInternalException("Couldn't evaluate sql 
schemas for causality token: " + token, throwable)
+                        new IgniteInternalException("Couldn't evaluate sql 
schemas for causality token: " + token,

Review Comment:
   maybe we shouldn't use deprecated constructors in the new code



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java:
##########
@@ -214,50 +195,58 @@ public synchronized CompletableFuture<?> onTableCreated(
             TableImpl table,
             long causalityToken
     ) {
-        schemasVv.update(
-                causalityToken,
-                (schemas, e) -> {
-                    if (e != null) {
-                        return failedFuture(e);
-                    }
+        if (!busyLock.enterBusy()) {
+            return failedFuture(new IgniteInternalException(NODE_STOPPING_ERR, 
new NodeStoppingException()));
+        }
+        try {
+            schemasVv.update(
+                    causalityToken,
+                    (schemas, e) -> {
+                        if (e != null) {
+                            return failedFuture(e);
+                        }
 
-                    Map<String, IgniteSchema> res = new HashMap<>(schemas);
+                        Map<String, IgniteSchema> res = new HashMap<>(schemas);
 
-                    IgniteSchema schema = res.computeIfAbsent(schemaName, 
IgniteSchema::new);
+                        IgniteSchema schema = res.computeIfAbsent(schemaName, 
IgniteSchema::new);
 
-                    CompletableFuture<IgniteTableImpl> igniteTableFuture = 
convert(causalityToken, table);
+                        CompletableFuture<IgniteTableImpl> igniteTableFuture = 
convert(causalityToken, table);
 
-                    return tablesVv
-                            .update(
-                                    causalityToken,
-                                    (tables, ex) -> {
-                                        if (ex != null) {
-                                            return failedFuture(ex);
+                        return tablesVv
+                                .update(
+                                        causalityToken,
+                                        (tables, ex) -> {
+                                            if (ex != null) {
+                                                return failedFuture(ex);
+                                            }
+
+                                            Map<UUID, IgniteTable> resTbls = 
new HashMap<>(tables);
+
+                                            return igniteTableFuture
+                                                    .thenApply(igniteTable -> {
+                                                        
resTbls.put(igniteTable.id(), igniteTable);
+
+                                                        return resTbls;
+                                                    });

Review Comment:
   this will be asynchronous part in this update, possibly needs to be wrapped 
with busy lock



##########
modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java:
##########
@@ -336,11 +345,20 @@ private SchemaDescriptor getSchemaDescriptorLocally(int 
schemaVer, ExtendedTable
      */
     public CompletableFuture<SchemaRegistry> schemaRegistry(long 
causalityToken, @Nullable UUID tableId) {
         if (!busyLock.enterBusy()) {
-            throw new IgniteException(new NodeStoppingException());
+            throw new IgniteException(NODE_STOPPING_ERR, new 
NodeStoppingException());
         }
 
         try {
-            return registriesVv.get(causalityToken).thenApply(regs -> tableId 
== null ? null : regs.get(tableId));
+            return registriesVv.get(causalityToken).thenApply(regs -> {
+                if (!busyLock.enterBusy()) {
+                    throw new IgniteException(NODE_STOPPING_ERR, new 
NodeStoppingException());
+                }
+                try {
+                    return tableId == null ? null : regs.get(tableId);

Review Comment:
   i am not sure that the case when `tableId` is still really needed, maybe we 
can make it `@NotNull`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to