korlov42 commented on a change in pull request #8858:
URL: https://github.com/apache/ignite/pull/8858#discussion_r611233758



##########
File path: 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java
##########
@@ -283,7 +283,8 @@ public InnerJoin(ExecutionContext<Row> ctx, RelDataType 
rowType, Comparator<Row>
         @Override protected void join() throws Exception {
             inLoop = true;
             try {
-                while (requested > 0 && (left != null || !leftInBuf.isEmpty()) 
&& (right != null || !rightInBuf.isEmpty() || rightMaterialization != null)) {
+                while (requested > 0 && !isClosed() && (left != null || 
!leftInBuf.isEmpty()) && (right != null || !rightInBuf.isEmpty()

Review comment:
       MergeJoin as well as NestedLoop has several algorithms, thus this should 
be done for all of them.

##########
File path: 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionContext.java
##########
@@ -242,6 +247,30 @@ public void execute(RunnableX task, Consumer<Throwable> 
onError) {
         });
     }
 
+    /**
+     * Submits a Runnable task for execution and returns a Future
+     * representing that task. The Future's {@code get} method will
+     * return {@code null} upon <em>successful</em> completion.
+     *
+     * @param task the task to submit.
+     * @return a Future representing pending task
+     */
+    public Future<?> submit(RunnableX task, Consumer<Throwable> onError) {
+        if (isCancelled())

Review comment:
       no task should be scheduled after the context is canceled, so lets 
verify this with an assertion

##########
File path: 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
##########
@@ -116,12 +122,18 @@ public MessageService messageService() {
 
     /** {@inheritDoc} */
     @Override public void closeOutbox(UUID nodeId, UUID qryId, long 
fragmentId, long exchangeId) throws IgniteCheckedException {
-        messageService().send(nodeId, new OutboxCloseMessage(qryId, 
fragmentId, exchangeId));
+        if (messageService().localNode().equals(nodeId))

Review comment:
       why it can't be async?

##########
File path: 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/SortNode.java
##########
@@ -115,7 +116,7 @@ private void flush() throws Exception {
 
         inLoop = true;
         try {
-            while (requested > 0 && !rows.isEmpty()) {
+            while (requested > 0 && !rows.isEmpty() && !isClosed()) {

Review comment:
       I was wrong about this. The `close` flag is set in the same thread, 
hence there is no need to verify on every cycle's iteration whether the node 
was closed, only once at the task beginning.

##########
File path: 
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
##########
@@ -951,7 +951,7 @@ private RemoteFragmentKey(UUID nodeId, long fragmentId) {
         private final Set<RemoteFragmentKey> waiting;
 
         /** */
-        private QueryState state;
+        private volatile QueryState state;

Review comment:
       and what is then 
[this](https://github.com/apache/ignite/pull/8858/files#diff-0438ecb05924a74f58ae34cb34ebeec22a061fbaa74968fe5e8f4729f530e810R997)?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to