korlov42 commented on code in PR #1237:
URL: https://github.com/apache/ignite-3/pull/1237#discussion_r1009267210


##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java:
##########
@@ -70,9 +74,9 @@
     /** Participating columns. */
     private final @Nullable BitSet requiredColumns;
 
-    private final RangeIterable<RowT> rangeConditions;
+    private final @Nullable Comparator<BinaryTuple> comp;

Review Comment:
   You need this iterable to reopen cursor after `rewind` operation.
   
   We use rewind to re-execute the source tree with new context. Assume the 
following case: 
   ```
   parentNode {
          Node source;
          doJob() {
                  ctx.setVariable(corrVarId_1, someValue)
   
                  source.rewind();
                  source.request(n);
          }
   }
   ```
   In the example above, parent node set some value in the context, and then 
request the next portion of data from the source node. In order to make the 
source node to be aware of changing in the context, parent node invokes 
`rewind` on a source node as well.



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/CompositePublisher.java:
##########
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.util;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Flow;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Composite publisher.
+ */
+public class CompositePublisher<T> implements Flow.Publisher<T> {
+    /** List of registered publishers. */
+    private final Collection<? extends Publisher<T>> publishers;
+
+    /** Flag indicating the state of the subscription. */
+    private final AtomicBoolean subscribed = new AtomicBoolean();
+
+    /** Items comparator. */
+    private final Comparator<T> comp;
+
+    public CompositePublisher(Collection<? extends Publisher<T>> publishers, 
@Nullable Comparator<T> comp) {
+        this.publishers = publishers;
+        this.comp = comp;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void subscribe(Subscriber<? super T> delegate) {
+        if (!subscribed.compareAndSet(false, true)) {

Review Comment:
   what's wrong with multiple subscriptions? 



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexScanNode.java:
##########
@@ -70,9 +74,9 @@
     /** Participating columns. */
     private final @Nullable BitSet requiredColumns;
 
-    private final RangeIterable<RowT> rangeConditions;
+    private final @Nullable Comparator<BinaryTuple> comp;
 
-    private Iterator<RangeCondition<RowT>> rangeConditionIterator;
+    private final @Nullable Iterator<RangeCondition<RowT>> 
rangeConditionIterator;

Review Comment:
   see comment above. We need to create the iterator after ever `rewind`



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java:
##########
@@ -305,7 +305,7 @@ public Node<RowT> visit(IgniteIndexScan rel) {
         ColocationGroup group = ctx.group(rel.sourceId());
         int[] parts = group.partitions(ctx.localNodeId());
 
-        return new IndexScanNode<>(ctx, rowType, idx, tbl, parts, ranges, 
filters, prj, requiredColumns.toBitSet());
+        return new IndexScanNode<>(ctx, rowType, idx, tbl, parts, 
rel.collation(), ranges, filters, prj, requiredColumns.toBitSet());

Review Comment:
   can we create comparator outside of the scanNode?



##########
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/CompositePublisher.java:
##########
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.util;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Flow;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Composite publisher.
+ */
+public class CompositePublisher<T> implements Flow.Publisher<T> {
+    /** List of registered publishers. */
+    private final Collection<? extends Publisher<T>> publishers;
+
+    /** Flag indicating the state of the subscription. */
+    private final AtomicBoolean subscribed = new AtomicBoolean();
+
+    /** Items comparator. */
+    private final Comparator<T> comp;
+
+    public CompositePublisher(Collection<? extends Publisher<T>> publishers, 
@Nullable Comparator<T> comp) {

Review Comment:
   Does it make sense to introduce two different classes for two different 
strategies? In my opinion, this will make an API  a little bit cleaner and 
predictable. Assume the case, when you need a SortingPublisher, but due to bug 
in the code you will pass a `null` comparator. I'd prefer to throw an exception 
during publisher creation, rather than returning an invalid result



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to