korlov42 commented on a change in pull request #8858:
URL: https://github.com/apache/ignite/pull/8858#discussion_r596916673



##########
File path: 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
##########
@@ -115,7 +116,7 @@ private void flush() throws Exception {
 
         inLoop = true;
         try {
-            while (requested > 0 && !rows.isEmpty()) {
+            while (requested > 0 && !rows.isEmpty() && !isClosed()) {

Review comment:
       this should be done for every node

##########
File path: 
modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
##########
@@ -89,6 +89,20 @@ public void execute(Runnable task, int idx) {
         execs[threadId(idx)].execute(task);
     }
 
+    /**
+     * Submits a Runnable task for execution and returns a Future representing 
that task.
+     * The command with the same {@code index} will be executed in the same 
thread.
+     *
+     * @param task the task to submit
+     * @return a Future representing pending completion of the task
+     * @throws RejectedExecutionException if the task cannot be
+     *         scheduled for execution
+     * @throws NullPointerException if the task is null
+     */
+    public Future<?> submit(Runnable task, int idx) {
+        return execs[threadId(idx)].submit(task);
+    }

Review comment:
       ```suggestion
       /**
        * Submits a {@link Runnable} task for execution and returns a {@link 
Future} representing that task.
        * The command with the same {@code index} will be executed in the same 
thread.
        *
        * @param task The task to submit.
        * @return a {@link Future} representing pending completion of the task.
        * @throws RejectedExecutionException if the task cannot be
        *         scheduled for execution.
        * @throws NullPointerException if the task is {@code null}.
        */
       public Future<?> submit(Runnable task, int idx) {
           return execs[threadId(idx)].submit(task);
       }
   ```

##########
File path: 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
##########
@@ -116,12 +122,18 @@ public MessageService messageService() {
 
     /** {@inheritDoc} */
     @Override public void closeOutbox(UUID nodeId, UUID qryId, long 
fragmentId, long exchangeId) throws IgniteCheckedException {
-        messageService().send(nodeId, new OutboxCloseMessage(qryId, 
fragmentId, exchangeId));
+        if (messageService().localNode().equals(nodeId))

Review comment:
       this routing already handled by messageService

##########
File path: 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
##########
@@ -157,13 +169,31 @@ public MessageService messageService() {
     /** */
     protected void onMessage(UUID nodeId, InboxCloseMessage msg) {
         Collection<Inbox<?>> inboxes = 
mailboxRegistry().inboxes(msg.queryId(), msg.fragmentId(), msg.exchangeId());
-        if (!F.isEmpty(inboxes)) {
-            for (Inbox<?> inbox : inboxes) {
-                inbox.context().cancel();
-                inbox.context().execute(inbox::close, inbox::onError);

Review comment:
       actually it's ok to close inboxes asynchronously, but we must not cancel 
an execution context here

##########
File path: 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
##########
@@ -1030,6 +1028,18 @@ private void tryClose() {
                     }
                 }
 
+                if (!remotes.contains(messageService().localNode())) {

Review comment:
       local inbox will be closed within the closing of the root fragment

##########
File path: 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistryImpl.java
##########
@@ -121,6 +122,7 @@ public MailboxRegistryImpl(GridKernalContext ctx) {
     @Override public Collection<Inbox<?>> inboxes(@Nullable UUID qryId, long 
fragmentId, long exchangeId) {
         return remotes.values().stream()
             .filter(makeFilter(qryId, fragmentId, exchangeId))
+            .filter(node -> !node.isClosed())

Review comment:
       Why do you decide to add a filtration for a closed inboxes? 

##########
File path: 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/QueryTaskExecutor.java
##########
@@ -33,4 +34,16 @@
      * @param qryTask Query task.
      */
     void execute(UUID qryId, long fragmentId, Runnable qryTask);
+
+    /**
+     * Submits a Runnable task for execution and returns a Future
+     * representing that task. The Future's {@code get} method will
+     * return {@code null} upon <em>successful</em> completion.
+     *
+     * @param qryId query Id.
+     * @param fragmentId  fragment Id.
+     * @param qryTask the task to submit.
+     * @return a Future representing pending task
+     */

Review comment:
       ```suggestion
       /**
        * Submits a {@link Runnable} task for execution and returns a {@link 
Future}
        * representing that task. The Future's {@link Future#get() get} method 
will
        * return {@code null} upon <em>successful</em> completion.
        *
        * @param qryId Id of the query this task created for.
        * @param fragmentId Id of the particular fragment this task created for.
        * @param qryTask The task to submit.
        * @return A {@link Future} representing pending task.
        */
   ```

##########
File path: 
modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
##########
@@ -53,6 +62,19 @@
     /** */
     private static IgniteEx client;
 
+    private ListeningTestLogger listeningLog = new ListeningTestLogger(log);

Review comment:
       javadocs here and below

##########
File path: 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Outbox.java
##########
@@ -174,8 +176,12 @@ public void init() {
         registry.unregister(this);
 
         // Send cancel message for the Inbox to close Inboxes created by batch 
message race.
-        for (UUID node : dest.targets())
-            getOrCreateBuffer(node).close();
+        for (UUID node : dest.targets()) {

Review comment:
       are you sure about this? It seems right now it's not possible not to 
have a buffer for target. But if it would be possible, some target would not 
get a close message.

##########
File path: 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
##########
@@ -951,7 +951,7 @@ private RemoteFragmentKey(UUID nodeId, long fragmentId) {
         private final Set<RemoteFragmentKey> waiting;
 
         /** */
-        private QueryState state;
+        private volatile QueryState state;

Review comment:
       volatile is unnecessary since all interaction with state is inside 
synchronised blocks

##########
File path: 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
##########
@@ -20,6 +20,7 @@
 import java.util.PriorityQueue;
 
 import org.apache.calcite.rel.type.RelDataType;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionCancelledException;

Review comment:
       unrelated change 

##########
File path: 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java
##########
@@ -32,6 +33,7 @@
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.ExchangeService;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionCancelledException;

Review comment:
       unrelated changes 

##########
File path: 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
##########
@@ -242,6 +243,25 @@ public void execute(RunnableX task, Consumer<Throwable> 
onError) {
         });
     }
 
+    /**
+     * Submits a Runnable task for execution and returns a Future
+     * representing that task. The Future's {@code get} method will
+     * return {@code null} upon <em>successful</em> completion.
+     *
+     * @param task the task to submit.
+     * @return a Future representing pending task
+     */
+    public Future<?> submit(RunnableX task, Consumer<Throwable> onError) {
+        return executor.submit(qryId, fragmentId(), () -> {
+            try {
+                task.run();
+            }
+            catch (Throwable t) {
+                onError.accept(t);

Review comment:
       exception should be dethroned as for `execute` method

##########
File path: 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
##########
@@ -175,13 +205,40 @@ else if (log.isDebugEnabled()) {
     /** */
     protected void onMessage(UUID nodeId, OutboxCloseMessage msg) {
         Collection<Outbox<?>> outboxes = 
mailboxRegistry().outboxes(msg.queryId(), msg.fragmentId(), msg.exchangeId());
-        if (!F.isEmpty(outboxes)) {
-            for (Outbox<?> outbox : outboxes) {
-                outbox.context().cancel();
-                outbox.context().execute(outbox::close, outbox::onError);
+
+        List<Future<?>> futs = new ArrayList<>(outboxes.size());
+
+        Set<ExecutionContext<?>> ctxs = new HashSet<>();
+
+        for (Outbox<?> outbox : outboxes) {
+            Future<?> fut = outbox.context().submit(outbox::close, 
outbox::onError);
+
+            futs.add(fut);
+
+            ctxs.add(outbox.context());
+        }
+
+        for (Future<?> fut : futs) {
+            try {
+                fut.get();
+            }
+            catch (InterruptedException | ExecutionException e) {
+                Thread.currentThread().interrupt();
+
+                U.warn(log, e);
             }
         }
-        else if (log.isDebugEnabled()) {
+
+        try {

Review comment:
       why do we close inbox here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to