korlov42 commented on code in PR #1469:
URL: https://github.com/apache/ignite-3/pull/1469#discussion_r1083836141


##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java:
##########
@@ -343,37 +346,59 @@ public void checkTransactionsWithDml() throws Exception {
         assertEquals(txManagerInternal.finished(), states.size());
     }
 
-    /** Check correctness of rw and ro transactions. */
+    /** Check correctness of rw and ro transactions for table scan. */
     @Test
-    public void checkMixedTransactions() throws Exception {
+    public void checkMixedTransactionsForTable() throws Exception {
+        sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
+
+        Matcher<String> planMatcher = containsTableScan("PUBLIC", "TEST");
+
+        checkMixedTransactions(planMatcher);
+    }
+

Review Comment:
   extra line 



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/SubscriptionUtils.java:
##########
@@ -49,6 +50,31 @@ public static <T> Publisher<T> concat(Publisher<? extends 
T>... sources) {
         return new ConcatenatedPublisher<>(Arrays.asList(sources).iterator());
     }
 
+    /**
+     * Create thread-safe publisher wrapper of combine multiple publishers, 
sources for whom are suppliers what can be used for lazy
+     * opening these publishers . Generally, start consuming a source once the 
previous source has terminated, building a chain.
+     *
+     * @param sources Array of publisher suppliers which should be combine.
+     * @return Publisher which will be combine all of passed as parameter to 
single one.
+     */
+    @SafeVarargs
+    public static <T> Publisher<T> concat(Supplier<Publisher<T>>... sources) {

Review Comment:
   I think, javadoc have to be reworked. Currently, there are a lot of grammar 
mistakes, and it hard to understand. Besides, javadoc states this method 
returns a thread-safe publisher, but non synchronised increment of non volatile 
field is used to move to the next source...



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java:
##########
@@ -119,282 +91,122 @@ public IndexScanNode(
             @Nullable Function<RowT, RowT> rowTransformer,
             @Nullable BitSet requiredColumns
     ) {
-        super(ctx);
+        super(ctx, rowFactory, schemaTable, filters, rowTransformer, 
requiredColumns);
 
         assert !nullOrEmpty(parts);
-
-        assert ctx.transaction() != null || ctx.transactionTime() != null : 
"Transaction not initialized.";
+        assert rangeConditions == null || rangeConditions.size() > 0;
 
         this.schemaIndex = schemaIndex;
         this.parts = parts;
-        this.filters = filters;
-        this.rowTransformer = rowTransformer;
         this.requiredColumns = requiredColumns;
         this.rangeConditions = rangeConditions;
         this.comp = comp;
         this.factory = rowFactory;
 
-        rangeConditionIterator = rangeConditions == null ? null : 
rangeConditions.iterator();
-
-        tableRowConverter = row -> schemaTable.toRow(context(), row, factory, 
requiredColumns);
-
         indexRowSchema = 
RowConverter.createIndexRowSchema(schemaIndex.columns(), 
schemaTable.descriptor());
     }
 
     /** {@inheritDoc} */
     @Override
-    public void request(int rowsCnt) throws Exception {
-        assert rowsCnt > 0 && requested == 0 : "rowsCnt=" + rowsCnt + ", 
requested=" + requested;
-
-        checkState();
-
-        requested = rowsCnt;
-
-        if (!inLoop) {
-            context().execute(this::push, this::onError);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public void closeInternal() {
-        super.closeInternal();
-
-        if (activeSubscription != null) {
-            activeSubscription.cancel();
-
-            activeSubscription = null;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    protected void rewindInternal() {
-        requested = 0;
-        waiting = 0;
-        rangeConditionsProcessed = false;
-
+    protected Publisher<RowT> scan() {
         if (rangeConditions != null) {
-            rangeConditionIterator = rangeConditions.iterator();
-        }
-
-        if (activeSubscription != null) {
-            activeSubscription.cancel();
-
-            activeSubscription = null;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public void register(List<Node<RowT>> sources) {
-        throw new UnsupportedOperationException();
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    protected Downstream<RowT> requestDownstream(int idx) {
-        throw new UnsupportedOperationException();
-    }
-
-    private void push() throws Exception {
-        if (isClosed()) {
-            return;
-        }
-
-        checkState();
-
-        if (requested > 0 && !inBuff.isEmpty()) {
-            inLoop = true;
-            try {
-                while (requested > 0 && !inBuff.isEmpty()) {
-                    checkState();
-
-                    RowT row = inBuff.poll();
-
-                    if (filters != null && !filters.test(row)) {
-                        continue;
-                    }
+            Publisher<RowT>[] conditionPublishers = new 
Publisher[rangeConditions.size()];
 
-                    if (rowTransformer != null) {
-                        row = rowTransformer.apply(row);
-                    }
+            Iterator<RangeCondition<RowT>> it = rangeConditions.iterator();
 
-                    requested--;
-                    downstream().push(row);
-                }
-            } finally {
-                inLoop = false;
-            }
-        }
-
-        if (requested > 0) {
-            if (waiting == 0 || activeSubscription == null) {
-                requestNextBatch();
-            }
-        }
-
-        if (requested > 0 && waiting == NOT_WAITING) {
-            if (inBuff.isEmpty()) {
-                requested = 0;
-                downstream().end();
-            } else {
-                context().execute(this::push, this::onError);
-            }
-        }
-    }
-
-    private void requestNextBatch() {
-        if (waiting == NOT_WAITING) {
-            return;
-        }
-
-        if (waiting == 0) {
-            // we must not request rows more than inBufSize
-            waiting = inBufSize - inBuff.size();
-        }
-
-        Subscription subscription = this.activeSubscription;
-        if (subscription != null) {
-            subscription.request(waiting);
-        } else if (!rangeConditionsProcessed) {
-            RangeCondition<RowT> cond = null;
-
-            if (rangeConditionIterator == null || 
!rangeConditionIterator.hasNext()) {
-                rangeConditionsProcessed = true;
-            } else {
-                cond = rangeConditionIterator.next();
-
-                rangeConditionsProcessed = !rangeConditionIterator.hasNext();
+            int i = 0;
+            while (it.hasNext()) {
+                conditionPublishers[i++] = indexPublisher(parts, it.next());
             }
 
-            indexPublisher(parts, cond).subscribe(new SubscriberImpl());
+            return SubscriptionUtils.concat(conditionPublishers);

Review Comment:
   this whole `if` block could be replaced with `return 
SubscriptionUtils.concat(new TransformingIterator<>(rangeConditions.iterator(), 
cond -> indexPublisher(parts, cond)));`



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/SubscriptionUtils.java:
##########
@@ -49,6 +50,31 @@ public static <T> Publisher<T> concat(Publisher<? extends 
T>... sources) {
         return new ConcatenatedPublisher<>(Arrays.asList(sources).iterator());
     }
 
+    /**
+     * Create thread-safe publisher wrapper of combine multiple publishers, 
sources for whom are suppliers what can be used for lazy
+     * opening these publishers . Generally, start consuming a source once the 
previous source has terminated, building a chain.
+     *
+     * @param sources Array of publisher suppliers which should be combine.
+     * @return Publisher which will be combine all of passed as parameter to 
single one.
+     */
+    @SafeVarargs
+    public static <T> Publisher<T> concat(Supplier<Publisher<T>>... sources) {
+        return concat(new Iterator() {
+            private int idx;
+
+            @Override
+            public boolean hasNext() {
+                return idx < sources.length;
+            }
+
+            @Override
+            public Publisher<T> next() {
+                return sources[idx++].get();
+            }
+        });
+    }
+
+

Review Comment:
   extra line



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/AbstractBasicIntegrationTest.java:
##########
@@ -227,8 +227,25 @@ public void tearDown(TestInfo testInfo) throws Exception {
         tearDownBase(testInfo);
     }
 
+    /**
+     * Start execute query and check all passed to the builder asserts.

Review Comment:
   `Start execute` does't sound like valid statement. You need to use noun 
after verb



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java:
##########
@@ -119,282 +91,122 @@ public IndexScanNode(
             @Nullable Function<RowT, RowT> rowTransformer,
             @Nullable BitSet requiredColumns
     ) {
-        super(ctx);
+        super(ctx, rowFactory, schemaTable, filters, rowTransformer, 
requiredColumns);
 
         assert !nullOrEmpty(parts);
-
-        assert ctx.transaction() != null || ctx.transactionTime() != null : 
"Transaction not initialized.";
+        assert rangeConditions == null || rangeConditions.size() > 0;
 
         this.schemaIndex = schemaIndex;
         this.parts = parts;
-        this.filters = filters;
-        this.rowTransformer = rowTransformer;
         this.requiredColumns = requiredColumns;
         this.rangeConditions = rangeConditions;
         this.comp = comp;
         this.factory = rowFactory;
 
-        rangeConditionIterator = rangeConditions == null ? null : 
rangeConditions.iterator();
-
-        tableRowConverter = row -> schemaTable.toRow(context(), row, factory, 
requiredColumns);
-
         indexRowSchema = 
RowConverter.createIndexRowSchema(schemaIndex.columns(), 
schemaTable.descriptor());
     }
 
     /** {@inheritDoc} */
     @Override
-    public void request(int rowsCnt) throws Exception {
-        assert rowsCnt > 0 && requested == 0 : "rowsCnt=" + rowsCnt + ", 
requested=" + requested;
-
-        checkState();
-
-        requested = rowsCnt;
-
-        if (!inLoop) {
-            context().execute(this::push, this::onError);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public void closeInternal() {
-        super.closeInternal();
-
-        if (activeSubscription != null) {
-            activeSubscription.cancel();
-
-            activeSubscription = null;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    protected void rewindInternal() {
-        requested = 0;
-        waiting = 0;
-        rangeConditionsProcessed = false;
-
+    protected Publisher<RowT> scan() {
         if (rangeConditions != null) {
-            rangeConditionIterator = rangeConditions.iterator();
-        }
-
-        if (activeSubscription != null) {
-            activeSubscription.cancel();
-
-            activeSubscription = null;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public void register(List<Node<RowT>> sources) {
-        throw new UnsupportedOperationException();
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    protected Downstream<RowT> requestDownstream(int idx) {
-        throw new UnsupportedOperationException();
-    }
-
-    private void push() throws Exception {
-        if (isClosed()) {
-            return;
-        }
-
-        checkState();
-
-        if (requested > 0 && !inBuff.isEmpty()) {
-            inLoop = true;
-            try {
-                while (requested > 0 && !inBuff.isEmpty()) {
-                    checkState();
-
-                    RowT row = inBuff.poll();
-
-                    if (filters != null && !filters.test(row)) {
-                        continue;
-                    }
+            Publisher<RowT>[] conditionPublishers = new 
Publisher[rangeConditions.size()];
 
-                    if (rowTransformer != null) {
-                        row = rowTransformer.apply(row);
-                    }
+            Iterator<RangeCondition<RowT>> it = rangeConditions.iterator();
 
-                    requested--;
-                    downstream().push(row);
-                }
-            } finally {
-                inLoop = false;
-            }
-        }
-
-        if (requested > 0) {
-            if (waiting == 0 || activeSubscription == null) {
-                requestNextBatch();
-            }
-        }
-
-        if (requested > 0 && waiting == NOT_WAITING) {
-            if (inBuff.isEmpty()) {
-                requested = 0;
-                downstream().end();
-            } else {
-                context().execute(this::push, this::onError);
-            }
-        }
-    }
-
-    private void requestNextBatch() {
-        if (waiting == NOT_WAITING) {
-            return;
-        }
-
-        if (waiting == 0) {
-            // we must not request rows more than inBufSize
-            waiting = inBufSize - inBuff.size();
-        }
-
-        Subscription subscription = this.activeSubscription;
-        if (subscription != null) {
-            subscription.request(waiting);
-        } else if (!rangeConditionsProcessed) {
-            RangeCondition<RowT> cond = null;
-
-            if (rangeConditionIterator == null || 
!rangeConditionIterator.hasNext()) {
-                rangeConditionsProcessed = true;
-            } else {
-                cond = rangeConditionIterator.next();
-
-                rangeConditionsProcessed = !rangeConditionIterator.hasNext();
+            int i = 0;
+            while (it.hasNext()) {
+                conditionPublishers[i++] = indexPublisher(parts, it.next());
             }
 
-            indexPublisher(parts, cond).subscribe(new SubscriberImpl());
+            return SubscriptionUtils.concat(conditionPublishers);
         } else {
-            waiting = NOT_WAITING;
+            return indexPublisher(parts, null);
         }
     }
 
     private Publisher<RowT> indexPublisher(int[] parts, @Nullable 
RangeCondition<RowT> cond) {

Review Comment:
   I would suggest to use `TransformingIterator` here. It will make code much 
cleaner



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to