alex-plekhanov commented on code in PR #12096:
URL: https://github.com/apache/ignite/pull/12096#discussion_r3201492709
##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteWindow.java:
##########
@@ -83,22 +82,22 @@ public IgniteWindow(
assert !grp.aggCalls.isEmpty();
}
- /** */
Review Comment:
Comment is absent
##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/externalize/RelJson.java:
##########
@@ -122,18 +122,18 @@
*/
@SuppressWarnings({"rawtypes", "unchecked"})
class RelJson {
- /** */
+ /** */
Review Comment:
All these changes to comments with two spaces (`/* */`) should be reverted
to comments with one space.
##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/window/WindowFunctionFactory.java:
##########
@@ -18,55 +18,62 @@
package org.apache.ignite.internal.processors.query.calcite.exec.exp.window;
import java.util.List;
+import java.util.Map;
import java.util.function.Function;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
import com.google.common.collect.ImmutableList;
+import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.Window;
import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlAggFunction;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import
org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AccumulatorWrapper;
import
org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AccumulatorsFactoryBase;
import
org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.jetbrains.annotations.NotNull;
/** A factory class responsible for instantiating window functions. */
-final class WindowFunctionFactory<Row> extends AccumulatorsFactoryBase<Row>
implements Supplier<List<WindowFunctionWrapper<Row>>> {
-
- /** */
+final class WindowFunctionFactory<Row> extends AccumulatorsFactoryBase<Row> {
+ /** */
Review Comment:
All these changes to comments with two spaces (/* */) should be reverted to
comments with one space.
##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/WindowNode.java:
##########
@@ -29,31 +29,34 @@
/** Window node. */
public class WindowNode<Row> extends MemoryTrackingNode<Row> implements
SingleNode<Row>, Downstream<Row> {
- /** */
+ /** */
Review Comment:
All these changes to comments with two spaces (/* */) should be reverted to
comments with one space.
##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteWindow.java:
##########
@@ -186,33 +185,25 @@ public boolean isStreaming() {
RelTraitSet traits = target;
RelCollation requiredCollation = TraitUtils.collation(target);
- if (!satisfiesCollationSansGroupFields(requiredCollation)) {
+ if (!satisfiesCollationSansGroupFields(requiredCollation))
traits = traits.replace(collation());
- }
IgniteDistribution distribution = TraitUtils.distribution(target);
if (!satisfiesDistribution(distribution))
traits = traits.replace(distribution());
- else if (distribution.getType() ==
RelDistribution.Type.HASH_DISTRIBUTED) {
- // Group set contains all distribution keys, shift distribution
keys according to used columns.
- IgniteDistribution outDistribution =
distribution.apply(Commons.mapping(grp.keys, rowType.getFieldCount()));
- traits = traits.replace(outDistribution);
- }
- if (traits == traitSet) {
- // new traits equal to current traits of window.
- // no need to pass throught or derive any.
+ if (traits == traitSet)
+ // New traits equal to current traits of window.
+ // No need to pass throught or derive any.
return null;
- }
return traits;
}
/** Check input distribution satisfies collation of this window. */
Review Comment:
distribution satisfies distribution
##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/WindowConverterRule.java:
##########
@@ -59,7 +64,22 @@ private WindowConverterRule() {
RelNode result = window.getInput();
- assert window.constants.isEmpty();
+ int inputFldCnt = result.getRowType().getFieldCount();
+ RexShuttle restoreConstant = new RexShuttle() {
+
Review Comment:
Redundant line
##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/WindowConverterRule.java:
##########
@@ -59,7 +64,22 @@ private WindowConverterRule() {
RelNode result = window.getInput();
- assert window.constants.isEmpty();
+ int inputFldCnt = result.getRowType().getFieldCount();
+ RexShuttle restoreConstant = new RexShuttle() {
+
+ @Override
+ public RexNode visitInputRef(RexInputRef inputRef) {
+ int idx = inputRef.getIndex();
+ if (idx < inputFldCnt) {
+ return inputRef;
+ } else {
+ // index above input field count reffers to window constant
Review Comment:
Start with upper case letter, end with point
##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/WindowNode.java:
##########
@@ -76,14 +79,12 @@ public WindowNode(
requested = rowsCnt;
- doPush();
-
- if (waiting == 0) {
- waiting = IN_BUFFER_SIZE;
- source().request(IN_BUFFER_SIZE);
+ if (!inLoop) {
Review Comment:
Codestyle: Redundant braces for one line statement.
##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/window/WindowFunctions.java:
##########
@@ -38,7 +38,6 @@
/** */
public final class WindowFunctions {
-
/** Check window group can be processed with streaming partition. */
public static boolean streamable(Window.Group group) {
Review Comment:
Abbreviation should be used for `group`
##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/window/BufferingWindowPartition.java:
##########
@@ -92,17 +90,22 @@ final class BufferingWindowPartition<Row> extends
WindowPartitionBase<Row> {
frame.reset();
}
+ @Override public boolean isStreaming() {
Review Comment:
Javadoc is absent
##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/WindowConverterRule.java:
##########
@@ -106,28 +126,36 @@ private WindowConverterRule() {
return (PhysicalNode)result;
}
- /** Replaces origial agg call ordinal with sequential index within group.
*/
- private static Window.Group replaceAggCallOrdinal(Window.Group grp) {
+ /**
+ * Replaces:
+ * - original agg call ordinal with sequential index within group;
+ * - input ref to window constant with actual rex literal.
+ */
+ private static IgniteWindow.Group convertGroup(Window.Group grp,
RexShuttle restoreConstant) {
List<Window.RexWinAggCall> newAggCalls = new
ArrayList<>(grp.aggCalls.size());
ImmutableList<Window.RexWinAggCall> calls = grp.aggCalls;
for (int i = 0; i < calls.size(); i++) {
Window.RexWinAggCall aggCall = calls.get(i);
+
+
Review Comment:
Redudnant line
##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/window/RangeWindowPartitionFrame.java:
##########
@@ -41,24 +41,23 @@
/** {@link WindowFunctionFrame} for RANGE clause. */
final class RangeWindowPartitionFrame<Row> extends WindowFunctionFrame<Row> {
-
/** Comparator for determining a peer's index within a partition. */
private final Comparator<Row> peerCmp;
/** Returns the row that marks the start of the frame. */
private final Function<Row, Row> lowerBound;
- /** */
+ /** */
Review Comment:
All these changes to comments with two spaces (/* */) should be reverted to
comments with one space.
##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/window/WindowPartitionFactory.java:
##########
@@ -17,52 +17,107 @@
package org.apache.ignite.internal.processors.query.calcite.exec.exp.window;
+import java.util.ArrayList;
import java.util.Comparator;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
import java.util.function.Supplier;
-import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.Window;
import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexSlot;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.internal.U;
/** Factory to create {@link WindowPartitionBase} factory from {@link
Window.Group}. */
public final class WindowPartitionFactory<Row> implements
Supplier<WindowPartition<Row>> {
+ /** */
Review Comment:
All these changes to comments with two spaces (/* */) should be reverted to
comments with one space.
##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteWindow.java:
##########
@@ -51,24 +51,23 @@
/**
* A relational expression representing a set of window aggregates.
*
- * <p>A Window can handle several window aggregate functions, over several
- * partitions, with pre- and post-expressions, and an optional post-filter.
- * Each of the partitions is defined by a partition key (zero or more columns)
- * and a range (logical or physical). The partitions expect the data to be
+ * <p>A Window can handle several window aggregate functions, over one
+ * partition, with pre- and post-expressions, and an optional post-filter.
+ * Partitions is defined by a partition key (zero or more columns)
+ * and a range (logical or physical). The partition expect the data to be
* sorted correctly on input to the relational expression.
*
* <p>Each {@link Window.Group} has a set of
* {@link org.apache.calcite.rex.RexOver} objects.
*/
public class IgniteWindow extends Window implements IgniteRel {
-
- /** */
+ /** */
Review Comment:
All these changes to comments with two spaces (/* */) should be reverted to
comments with one space.
##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/WindowConverterRule.java:
##########
@@ -59,7 +64,22 @@ private WindowConverterRule() {
RelNode result = window.getInput();
- assert window.constants.isEmpty();
+ int inputFldCnt = result.getRowType().getFieldCount();
+ RexShuttle restoreConstant = new RexShuttle() {
+
+ @Override
+ public RexNode visitInputRef(RexInputRef inputRef) {
+ int idx = inputRef.getIndex();
+ if (idx < inputFldCnt) {
Review Comment:
Redundant braces
##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/window/WindowPartitionFactory.java:
##########
@@ -17,52 +17,107 @@
package org.apache.ignite.internal.processors.query.calcite.exec.exp.window;
+import java.util.ArrayList;
import java.util.Comparator;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
import java.util.function.Supplier;
-import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.Window;
import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexSlot;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.internal.U;
/** Factory to create {@link WindowPartitionBase} factory from {@link
Window.Group}. */
public final class WindowPartitionFactory<Row> implements
Supplier<WindowPartition<Row>> {
+ /** */
+ private final WindowFunctionFactory<Row> funcFactory;
- /** */
- private final Supplier<WindowPartition<Row>> supplier;
+ /** */
+ private final ExecutionContext<Row> ctx;
- /** */
+ /** */
+ private final Window.Group grp;
+
+ /** */
+ private final RelDataType inputRowType;
+
+ /** */
+ private final RowHandler.RowFactory<Row> rowFactory;
+
+ /** */
+ private final Comparator<Row> peerCmp;
+
+ /** */
+ private final Function<Row, Row> project;
+
+ /** */
public WindowPartitionFactory(
ExecutionContext<Row> ctx,
- Window.Group group,
- List<AggregateCall> calls,
- RelDataType inputRowType,
- boolean streaming
+ Window.Group grp,
+ RelDataType inputRowType
) {
- supplier = () -> {
- List<RelDataType> aggTypes = Commons.transform(calls,
AggregateCall::getType);
- RowHandler.RowFactory<Row> aggRowFactory =
ctx.rowHandler().factory(Commons.typeFactory(), aggTypes);
-
- Comparator<Row> peerCmp;
- if (group.isRows)
- // peer comparator in meaningless in rows frame.
- peerCmp = null;
- else
- peerCmp =
ctx.expressionFactory().comparator(group.collation());
-
- WindowFunctionFactory<Row> accFactory = new
WindowFunctionFactory<>(ctx, group, calls, inputRowType);
- assert !streaming || (streaming && accFactory.isStreamable()) :
"Streaming window partition desired, but buffering is required";
- if (accFactory.isStreamable())
- return new StreamWindowPartition<>(peerCmp, accFactory,
aggRowFactory);
- else
- return new BufferingWindowPartition<>(peerCmp, accFactory,
aggRowFactory, ctx, group, inputRowType);
- };
+ this.ctx = ctx;
+ this.grp = grp;
+ this.inputRowType = inputRowType;
+
+ List<RelDataType> aggTypes = Commons.transform(grp.aggCalls,
Window.RexWinAggCall::getType);
+ rowFactory = ctx.rowHandler().factory(Commons.typeFactory(), aggTypes);
+ if (grp.isRows)
+ // peer comparator in meaningless in rows frame.
+ peerCmp = null;
+ else
+ peerCmp = ctx.expressionFactory().comparator(grp.collation());
+
+ Map<RexNode, Integer> rexToOrd = new LinkedHashMap<>();
+ RelDataType aggInputRowType = mapAggregateInputRowType(grp,
inputRowType, rexToOrd);
+
+ project =
ctx.expressionFactory().project(U.arrayList(rexToOrd.keySet()), inputRowType);
+
+ funcFactory = new WindowFunctionFactory<>(ctx, grp, project, rexToOrd,
aggInputRowType);
}
/** {@inheritDoc} */
@Override public WindowPartition<Row> get() {
- return supplier.get();
+ if (funcFactory.isStreamable())
+ return new StreamWindowPartition<>(peerCmp, funcFactory,
rowFactory);
+ else
+ return new BufferingWindowPartition<>(peerCmp, funcFactory,
rowFactory, ctx, grp, project, inputRowType);
}
+
+ /** Generates input row type for concrete aggregate call. */
+ private RelDataType mapAggregateInputRowType(Window.Group grp, RelDataType
inputRowType,
+ Map<RexNode, Integer> rexToOrd) {
+
+ List<RelDataTypeField> flds = new ArrayList<>();
+ int ord = 0;
+ for (Window.RexWinAggCall call : grp.aggCalls) {
+ for (int i = 0; i < call.operands.size(); i++) {
+ RexNode operand = call.operands.get(i);
+ if (rexToOrd.containsKey(operand)) {
Review Comment:
Codestyle: Redundant braces for one line statement.
##########
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/window/StreamWindowPartition.java:
##########
@@ -92,4 +92,8 @@ final class StreamWindowPartition<Row> extends
WindowPartitionBase<Row> {
peerIdx = -1;
accumulators = null;
}
+
+ @Override public boolean isStreaming() {
Review Comment:
Javadoc is absent
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]