korlov42 commented on code in PR #1856:
URL: https://github.com/apache/ignite-3/pull/1856#discussion_r1165294381


##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java:
##########
@@ -542,6 +542,9 @@ void requestNextBatchIfNeeded() throws 
IgniteInternalCheckedException {
             // IO_BATCH_CNT should never be less than 1, but we don't have 
validation
             if (IO_BATCH_CNT <= 1 && inFlightCount == 0) {
                 batchRequester.request(1, sharedStateHolder);
+
+                lastRequested++;

Review Comment:
   that was a stupid bug =(
   
   Does it make sense to refactor the whole method like 
   
   ```
           void requestNextBatchIfNeeded() throws 
IgniteInternalCheckedException {
               int maxInFlightCount = Math.max(IO_BATCH_CNT, 1);
               int currentInFlightCount = lastRequested - lastEnqueued;
   
               if (maxInFlightCount / 2 >= currentInFlightCount) {
                   int countOfBatches = maxInFlightCount - currentInFlightCount;
   
                   lastRequested += countOfBatches;
   
                   batchRequester.request(countOfBatches, sharedStateHolder);
                   // shared state should be send only once until next rewind
                   sharedStateHolder = null;
               }
           }
   ```



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java:
##########
@@ -282,7 +301,49 @@ public void onNodeLeft(String nodeName) {
         }
     }
 
-    private static final class RemoteDownstream<RowT> {
+    /**
+     * Enqueue current rewind request, the tries to process rewind queue 
requests (in order) if possible.

Review Comment:
   did you mean `then tries`? 



##########
modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/DmlPlannerTest.java:
##########
@@ -38,48 +41,107 @@
  * Tests to verify DML plans.
  */
 public class DmlPlannerTest extends AbstractPlannerTest {
-
     /**
-     * Test for INSERT .. VALUES when table has a single distribution.
+     * Test for INSERT .. FROM SELECT when a both tables has a single 
distribution.
+     * TODO: IGNITE-19018 split into 2 cases: colocated and non-colocated.

Review Comment:
   TODO is referring to current issue



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java:
##########
@@ -282,7 +301,49 @@ public void onNodeLeft(String nodeName) {
         }
     }
 
-    private static final class RemoteDownstream<RowT> {
+    /**
+     * Enqueue current rewind request, the tries to process rewind queue 
requests (in order) if possible.
+     *
+     * @param nodeName Requester node name.
+     * @param state Shared state.
+     * @param amountOfBatches Amount of batches requested.
+     * @throws Exception If failed.
+     */
+    public void onRewindRequest(String nodeName, SharedState state, int 
amountOfBatches) throws Exception {
+        checkState();
+
+        if (rewindQueue == null) {
+            rewindQueue = new ArrayDeque<>(nodeBuffers.size());
+        }
+
+        rewindQueue.offer(new RewindRequest(nodeName, state, amountOfBatches));
+
+        if (currentNode == null || currentNode.equals(nodeName)) {
+            processRewindQueue();
+        }
+    }
+
+    /**
+     * Takes the next delayed request from the queue if available, then 
applies the state, rewinds source and proceeds with the request.
+     *
+     * @throws Exception If failed to send the request.
+     */
+    private void processRewindQueue() throws Exception {
+        RewindRequest rewind = rewindQueue.poll();
+
+        if (rewind == null) {
+            return;
+        }
+
+        currentNode = rewind.nodeName;
+
+        context().sharedState(rewind.state);
+        rewind();
+
+        onRequest(currentNode, rewind.amountOfBatches);
+    }
+
+    static final class RemoteDownstream<RowT> {

Review Comment:
   why `package private`? The same question about `RewindRequest`



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java:
##########
@@ -266,8 +278,15 @@ private void flush() throws Exception {
         if (waiting == 0) {
             source().request(waiting = inBufSize);
         } else if (waiting == -1) {
-            for (RemoteDownstream<RowT> buffer : nodeBuffers.values()) {
-                buffer.end();
+            if (currentNode != null) {
+                nodeBuffers.get(currentNode).end();
+                currentNode = null; // Allow incoming rewind request from next 
node.

Review Comment:
   should we add `assert currentNode == null` to processRewindQueue to make 
sure that current request is accidentally not screwed up by call to 
`processRewindQueue()`?



##########
modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/DmlPlannerTest.java:
##########


Review Comment:
   I see no changes to the optimisation phase, so why do we need to change 
planner tests? 



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java:
##########
@@ -383,7 +444,7 @@ void reset() {
         void onBatchRequested(int amountOfBatches) throws Exception {
             assert amountOfBatches > 0 : amountOfBatches;
 
-            this.pendingCount = amountOfBatches;
+            this.pendingCount += amountOfBatches;

Review Comment:
   let's add test case for this to ExchangeExecutionTest



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to