[GitHub] [calcite] amaliujia commented on a change in pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

2020-06-26 Thread GitBox


amaliujia commented on a change in pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#discussion_r446320901



##
File path: 
core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableSortedAggregate.java
##
@@ -90,6 +101,133 @@ public EnumerableSortedAggregate(
   }
 
   public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
-throw Util.needToImplement("EnumerableSortedAggregate");
+if (getGroupType() != Group.SIMPLE
+|| aggCalls.isEmpty()) {
+  throw Util.needToImplement("EnumerableSortedAggregate");
+}
+
+final JavaTypeFactory typeFactory = implementor.getTypeFactory();
+final BlockBuilder builder = new BlockBuilder();
+final EnumerableRel child = (EnumerableRel) getInput();
+final Result result = implementor.visitChild(this, 0, child, pref);
+Expression childExp =
+builder.append(
+"child",
+result.block);
+
+final PhysType physType =
+PhysTypeImpl.of(
+typeFactory, getRowType(), pref.preferCustom());
+
+final PhysType inputPhysType = result.physType;
+
+ParameterExpression parameter =
+Expressions.parameter(inputPhysType.getJavaRowType(), "a0");
+
+final PhysType keyPhysType =
+inputPhysType.project(groupSet.asList(), getGroupType() != 
Group.SIMPLE,
+JavaRowFormat.LIST);
+final int groupCount = getGroupCount();
+
+final List aggs = new ArrayList<>(aggCalls.size());
+for (Ord call : Ord.zip(aggCalls)) {
+  aggs.add(new AggImpState(call.i, call.e, false));
+}
+
+// Function0 accumulatorInitializer =
+// new Function0() {
+// public Object[] apply() {
+// return new Object[] {0, 0};
+// }
+// };
+final List initExpressions = new ArrayList<>();
+final BlockBuilder initBlock = new BlockBuilder();
+
+final List aggStateTypes = createAggStateTypes(
+initExpressions, initBlock, aggs, typeFactory);
+
+final PhysType accPhysType =
+PhysTypeImpl.of(typeFactory,
+typeFactory.createSyntheticType(aggStateTypes));
+
+declareParentAccumulator(initExpressions, initBlock, accPhysType);
+
+final Expression accumulatorInitializer =
+builder.append("accumulatorInitializer",
+Expressions.lambda(
+Function0.class,
+initBlock.toBlock()));
+
+// Function2 accumulatorAdder =
+// new Function2() {
+// public Object[] apply(Object[] acc, Employee in) {
+//  acc[0] = ((Integer) acc[0]) + 1;
+//  acc[1] = ((Integer) acc[1]) + in.salary;
+// return acc;
+// }
+// };
+final ParameterExpression inParameter =
+Expressions.parameter(inputPhysType.getJavaRowType(), "in");
+final ParameterExpression acc_ =
+Expressions.parameter(accPhysType.getJavaRowType(), "acc");
+
+createAccumulatorAdders(
+inParameter, aggs, accPhysType, acc_, inputPhysType, builder, 
implementor, typeFactory);
+
+final ParameterExpression lambdaFactory =
+Expressions.parameter(AggregateLambdaFactory.class,
+builder.newName("lambdaFactory"));
+
+implementLambdaFactory(builder, inputPhysType, aggs, 
accumulatorInitializer,
+false, lambdaFactory);
+
+final BlockBuilder resultBlock = new BlockBuilder();
+final List results = Expressions.list();
+final ParameterExpression key_;
+final Type keyType = keyPhysType.getJavaRowType();
+key_ = Expressions.parameter(keyType, "key");
+for (int j = 0; j < groupCount; j++) {
+  final Expression ref = keyPhysType.fieldReference(key_, j);
+  results.add(ref);
+}
+
+for (final AggImpState agg : aggs) {
+  results.add(
+  agg.implementor.implementResult(agg.context,
+  new AggResultContextImpl(resultBlock, agg.call, agg.state, key_,
+  keyPhysType)));
+}
+resultBlock.add(physType.record(results));
+
+final Expression keySelector_ =
+builder.append("keySelector",
+inputPhysType.generateSelector(parameter,
+groupSet.asList(),
+keyPhysType.getFormat()));
+// Generate the appropriate key Comparator.
+final Expression comparator = 
keyPhysType.generateComparator(getTraitSet().getCollation());

Review comment:
   In fact. I cannot find a way to turn on top down opt by Hook.PLANNER





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [calcite] amaliujia commented on a change in pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

2020-06-25 Thread GitBox


amaliujia commented on a change in pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#discussion_r445733718



##
File path: 
linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
##
@@ -817,6 +817,112 @@ public void remove() {
 resultSelector);
   }
 
+  /**
+   * Group keys are sorted already. Key values are compared by using a
+   * specified comparator. Groups the elements of a sequence according to a
+   * specified key selector function and initializing one accumulator at a 
time.
+   * Go over elements sequentially, adding to accumulator each time an element
+   * with the same key is seen. When key changes, creates a result value from 
the
+   * accumulator and then re-initializes the accumulator.
+   */
+  public static  Enumerable 
sortedGroupBy(
+  Enumerable enumerable,
+  Function1 keySelector,
+  Function0 accumulatorInitializer,
+  Function2 accumulatorAdder,
+  final Function2 resultSelector,
+  final Comparator comparator) {
+return new AbstractEnumerable() {
+  public Enumerator enumerator() {
+return new SortedAggregateEnumerator(
+  enumerable, keySelector, accumulatorInitializer,
+  accumulatorAdder, resultSelector, comparator);
+  }
+};
+  }
+
+  private static class SortedAggregateEnumerator
+  implements Enumerator {
+private final Enumerable enumerable;
+private final Function1 keySelector;
+private final Function0 accumulatorInitializer;
+private final Function2 
accumulatorAdder;
+private final Function2 resultSelector;
+private final Comparator comparator;
+private boolean isInitialized;
+private TAccumulate curAccumulator;
+private Enumerator enumerator;
+
+SortedAggregateEnumerator(
+Enumerable enumerable,
+Function1 keySelector,
+Function0 accumulatorInitializer,
+Function2 accumulatorAdder,
+final Function2 resultSelector,
+final Comparator comparator) {
+  this.enumerable = enumerable;
+  this.keySelector = keySelector;
+  this.accumulatorInitializer = accumulatorInitializer;
+  this.accumulatorAdder = accumulatorAdder;
+  this.resultSelector = resultSelector;
+  this.comparator = comparator;
+  isInitialized = false;
+  curAccumulator = null;
+  enumerator = enumerable.enumerator();
+}
+
+@Override public TResult current() {
+  if (curAccumulator == null) {
+curAccumulator = accumulatorInitializer.apply();
+  }
+  TResult result = null;
+  TSource o = enumerator.current();
+  TKey prevKey = keySelector.apply(o);
+  curAccumulator = accumulatorAdder.apply(curAccumulator, o);
+  while (enumerator.moveNext()) {
+o = enumerator.current();
+TKey curKey = keySelector.apply(o);

Review comment:
   added this requirement to both javadoc of `sortedGroupBy` and relevant 
lines in `EnumerableSortedAggregate`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [calcite] amaliujia commented on a change in pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

2020-06-25 Thread GitBox


amaliujia commented on a change in pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#discussion_r445784153



##
File path: 
core/src/test/java/org/apache/calcite/test/enumerable/EnumerableSortedAggregateTest.java
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.test.enumerable;
+
+import org.apache.calcite.adapter.enumerable.EnumerableRules;
+import org.apache.calcite.adapter.java.ReflectiveSchema;
+import org.apache.calcite.config.CalciteConnectionProperty;
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.runtime.Hook;
+import org.apache.calcite.test.CalciteAssert;
+import org.apache.calcite.test.JdbcTest;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.function.Consumer;
+
+public class EnumerableSortedAggregateTest {
+  @Test void sortedAgg() {

Review comment:
   Thanks for your suggestion. I added a unit test for each of the 
suggested item.

##
File path: 
core/src/test/java/org/apache/calcite/test/enumerable/EnumerableSortedAggregateTest.java
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.test.enumerable;
+
+import org.apache.calcite.adapter.enumerable.EnumerableRules;
+import org.apache.calcite.adapter.java.ReflectiveSchema;
+import org.apache.calcite.config.CalciteConnectionProperty;
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.runtime.Hook;
+import org.apache.calcite.test.CalciteAssert;
+import org.apache.calcite.test.JdbcTest;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.function.Consumer;
+
+public class EnumerableSortedAggregateTest {
+  @Test void sortedAgg() {

Review comment:
   Thanks for your suggestion. I added a unit test for each of the 
suggested items.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [calcite] amaliujia commented on a change in pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

2020-06-25 Thread GitBox


amaliujia commented on a change in pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#discussion_r445734744



##
File path: 
core/src/test/java/org/apache/calcite/test/enumerable/EnumerableSortedAggregateTest.java
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.test.enumerable;
+
+import org.apache.calcite.adapter.enumerable.EnumerableRules;
+import org.apache.calcite.adapter.java.ReflectiveSchema;
+import org.apache.calcite.config.CalciteConnectionProperty;
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.runtime.Hook;
+import org.apache.calcite.test.CalciteAssert;
+import org.apache.calcite.test.JdbcTest;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.function.Consumer;
+
+public class EnumerableSortedAggregateTest {
+  @Test void sortedAgg() {

Review comment:
   Missed this comment. Will add more tests.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [calcite] amaliujia commented on a change in pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

2020-06-25 Thread GitBox


amaliujia commented on a change in pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#discussion_r445734105



##
File path: 
linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
##
@@ -817,6 +817,132 @@ public void remove() {
 resultSelector);
   }
 
+  /**
+   * Group keys are sorted already. Key values are compared by using a
+   * specified comparator. Groups the elements of a sequence according to a
+   * specified key selector function and initializing one accumulator at a 
time.
+   * Go over elements sequentially, adding to accumulator each time an element
+   * with the same key is seen. When key changes, creates a result value from 
the
+   * accumulator and then re-initializes the accumulator.
+   */
+  public static  Enumerable 
sortedGroupBy(
+  Enumerable enumerable,
+  Function1 keySelector,
+  Function0 accumulatorInitializer,
+  Function2 accumulatorAdder,
+  final Function2 resultSelector,
+  final Comparator comparator) {
+return new AbstractEnumerable() {
+  public Enumerator enumerator() {
+return new SortedAggregateEnumerator(
+  enumerable, keySelector, accumulatorInitializer,
+  accumulatorAdder, resultSelector, comparator);
+  }
+};
+  }
+
+  private static class SortedAggregateEnumerator
+  implements Enumerator {
+private final Enumerable enumerable;
+private final Function1 keySelector;
+private final Function0 accumulatorInitializer;
+private final Function2 
accumulatorAdder;
+private final Function2 resultSelector;
+private final Comparator comparator;
+private boolean isInitialized;
+private boolean isLastMoveNextFalse;
+private TAccumulate curAccumulator;
+private Enumerator enumerator;
+private TResult curResult;
+
+SortedAggregateEnumerator(
+Enumerable enumerable,
+Function1 keySelector,
+Function0 accumulatorInitializer,
+Function2 accumulatorAdder,
+final Function2 resultSelector,
+final Comparator comparator) {
+  this.enumerable = enumerable;
+  this.keySelector = keySelector;
+  this.accumulatorInitializer = accumulatorInitializer;
+  this.accumulatorAdder = accumulatorAdder;
+  this.resultSelector = resultSelector;
+  this.comparator = comparator;
+  isInitialized = false;
+  curAccumulator = null;
+  enumerator = enumerable.enumerator();
+  curResult = null;
+  isLastMoveNextFalse = false;
+}
+
+@Override public TResult current() {
+  if (isLastMoveNextFalse) {
+throw new NoSuchElementException();
+  }
+  return curResult;
+}
+
+@Override public boolean moveNext() {
+  if (!isInitialized) {
+isInitialized = true;
+// input is empty
+if (!enumerator.moveNext()) {
+  isLastMoveNextFalse = true;
+  return false;
+}
+  } else if (isInitialized && curAccumulator == null) {

Review comment:
   Yes! Removed `isInitialized`. It indeed should be true after the first 
call of `moveNext`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [calcite] amaliujia commented on a change in pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

2020-06-25 Thread GitBox


amaliujia commented on a change in pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#discussion_r445733718



##
File path: 
linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
##
@@ -817,6 +817,112 @@ public void remove() {
 resultSelector);
   }
 
+  /**
+   * Group keys are sorted already. Key values are compared by using a
+   * specified comparator. Groups the elements of a sequence according to a
+   * specified key selector function and initializing one accumulator at a 
time.
+   * Go over elements sequentially, adding to accumulator each time an element
+   * with the same key is seen. When key changes, creates a result value from 
the
+   * accumulator and then re-initializes the accumulator.
+   */
+  public static  Enumerable 
sortedGroupBy(
+  Enumerable enumerable,
+  Function1 keySelector,
+  Function0 accumulatorInitializer,
+  Function2 accumulatorAdder,
+  final Function2 resultSelector,
+  final Comparator comparator) {
+return new AbstractEnumerable() {
+  public Enumerator enumerator() {
+return new SortedAggregateEnumerator(
+  enumerable, keySelector, accumulatorInitializer,
+  accumulatorAdder, resultSelector, comparator);
+  }
+};
+  }
+
+  private static class SortedAggregateEnumerator
+  implements Enumerator {
+private final Enumerable enumerable;
+private final Function1 keySelector;
+private final Function0 accumulatorInitializer;
+private final Function2 
accumulatorAdder;
+private final Function2 resultSelector;
+private final Comparator comparator;
+private boolean isInitialized;
+private TAccumulate curAccumulator;
+private Enumerator enumerator;
+
+SortedAggregateEnumerator(
+Enumerable enumerable,
+Function1 keySelector,
+Function0 accumulatorInitializer,
+Function2 accumulatorAdder,
+final Function2 resultSelector,
+final Comparator comparator) {
+  this.enumerable = enumerable;
+  this.keySelector = keySelector;
+  this.accumulatorInitializer = accumulatorInitializer;
+  this.accumulatorAdder = accumulatorAdder;
+  this.resultSelector = resultSelector;
+  this.comparator = comparator;
+  isInitialized = false;
+  curAccumulator = null;
+  enumerator = enumerable.enumerator();
+}
+
+@Override public TResult current() {
+  if (curAccumulator == null) {
+curAccumulator = accumulatorInitializer.apply();
+  }
+  TResult result = null;
+  TSource o = enumerator.current();
+  TKey prevKey = keySelector.apply(o);
+  curAccumulator = accumulatorAdder.apply(curAccumulator, o);
+  while (enumerator.moveNext()) {
+o = enumerator.current();
+TKey curKey = keySelector.apply(o);

Review comment:
   added this requirements to both javadoc of `sortedGroupBy` and relevant 
lines in `EnumerableSortedAggregate`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [calcite] amaliujia commented on a change in pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

2020-06-25 Thread GitBox


amaliujia commented on a change in pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#discussion_r445733828



##
File path: 
linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
##
@@ -817,6 +817,132 @@ public void remove() {
 resultSelector);
   }
 
+  /**
+   * Group keys are sorted already. Key values are compared by using a
+   * specified comparator. Groups the elements of a sequence according to a
+   * specified key selector function and initializing one accumulator at a 
time.
+   * Go over elements sequentially, adding to accumulator each time an element
+   * with the same key is seen. When key changes, creates a result value from 
the
+   * accumulator and then re-initializes the accumulator.
+   */
+  public static  Enumerable 
sortedGroupBy(
+  Enumerable enumerable,
+  Function1 keySelector,
+  Function0 accumulatorInitializer,
+  Function2 accumulatorAdder,
+  final Function2 resultSelector,
+  final Comparator comparator) {
+return new AbstractEnumerable() {
+  public Enumerator enumerator() {
+return new SortedAggregateEnumerator(
+  enumerable, keySelector, accumulatorInitializer,
+  accumulatorAdder, resultSelector, comparator);
+  }
+};
+  }
+
+  private static class SortedAggregateEnumerator
+  implements Enumerator {
+private final Enumerable enumerable;
+private final Function1 keySelector;
+private final Function0 accumulatorInitializer;
+private final Function2 
accumulatorAdder;
+private final Function2 resultSelector;
+private final Comparator comparator;
+private boolean isInitialized;
+private boolean isLastMoveNextFalse;
+private TAccumulate curAccumulator;
+private Enumerator enumerator;
+private TResult curResult;
+
+SortedAggregateEnumerator(
+Enumerable enumerable,
+Function1 keySelector,
+Function0 accumulatorInitializer,
+Function2 accumulatorAdder,
+final Function2 resultSelector,
+final Comparator comparator) {
+  this.enumerable = enumerable;
+  this.keySelector = keySelector;
+  this.accumulatorInitializer = accumulatorInitializer;
+  this.accumulatorAdder = accumulatorAdder;
+  this.resultSelector = resultSelector;
+  this.comparator = comparator;
+  isInitialized = false;
+  curAccumulator = null;
+  enumerator = enumerable.enumerator();
+  curResult = null;
+  isLastMoveNextFalse = false;
+}
+
+@Override public TResult current() {
+  if (isLastMoveNextFalse) {
+throw new NoSuchElementException();
+  }
+  return curResult;
+}
+
+@Override public boolean moveNext() {
+  if (!isInitialized) {
+isInitialized = true;
+// input is empty
+if (!enumerator.moveNext()) {
+  isLastMoveNextFalse = true;
+  return false;
+}
+  } else if (isInitialized && curAccumulator == null) {
+// input has been exhausted.
+isLastMoveNextFalse = true;
+return false;
+  }
+
+  if (curAccumulator == null) {
+curAccumulator = accumulatorInitializer.apply();
+  }
+
+  // reset result because now it can move to next aggregated result.
+  curResult = null;
+  TSource o = enumerator.current();
+  TKey prevKey = keySelector.apply(o);
+  curAccumulator = accumulatorAdder.apply(curAccumulator, o);
+  while (enumerator.moveNext()) {
+o = enumerator.current();
+TKey curKey = keySelector.apply(o);
+if (comparator.compare(prevKey, curKey) != 0) {
+  // current key is different from previous key, get accumulated 
results and re-create
+  // accumulator for current key.
+  curResult = resultSelector.apply(prevKey, curAccumulator);
+  curAccumulator = accumulatorInitializer.apply();
+  break;
+} else {

Review comment:
   Yes! Removed `else`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [calcite] amaliujia commented on a change in pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

2020-06-24 Thread GitBox


amaliujia commented on a change in pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#discussion_r445213183



##
File path: 
linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
##
@@ -817,6 +817,112 @@ public void remove() {
 resultSelector);
   }
 
+  /**
+   * Group keys are sorted already. Key values are compared by using a
+   * specified comparator. Groups the elements of a sequence according to a
+   * specified key selector function and initializing one accumulator at a 
time.
+   * Go over elements sequentially, adding to accumulator each time an element
+   * with the same key is seen. When key changes, creates a result value from 
the
+   * accumulator and then re-initializes the accumulator.
+   */
+  public static  Enumerable 
sortedGroupBy(
+  Enumerable enumerable,
+  Function1 keySelector,
+  Function0 accumulatorInitializer,
+  Function2 accumulatorAdder,
+  final Function2 resultSelector,
+  final Comparator comparator) {
+return new AbstractEnumerable() {
+  public Enumerator enumerator() {
+return new SortedAggregateEnumerator(
+  enumerable, keySelector, accumulatorInitializer,
+  accumulatorAdder, resultSelector, comparator);
+  }
+};
+  }
+
+  private static class SortedAggregateEnumerator
+  implements Enumerator {
+private final Enumerable enumerable;
+private final Function1 keySelector;
+private final Function0 accumulatorInitializer;
+private final Function2 
accumulatorAdder;
+private final Function2 resultSelector;
+private final Comparator comparator;
+private boolean isInitialized;
+private TAccumulate curAccumulator;
+private Enumerator enumerator;
+
+SortedAggregateEnumerator(
+Enumerable enumerable,
+Function1 keySelector,
+Function0 accumulatorInitializer,
+Function2 accumulatorAdder,
+final Function2 resultSelector,
+final Comparator comparator) {
+  this.enumerable = enumerable;
+  this.keySelector = keySelector;
+  this.accumulatorInitializer = accumulatorInitializer;
+  this.accumulatorAdder = accumulatorAdder;
+  this.resultSelector = resultSelector;
+  this.comparator = comparator;
+  isInitialized = false;
+  curAccumulator = null;
+  enumerator = enumerable.enumerator();
+}
+
+@Override public TResult current() {
+  if (curAccumulator == null) {
+curAccumulator = accumulatorInitializer.apply();
+  }
+  TResult result = null;
+  TSource o = enumerator.current();
+  TKey prevKey = keySelector.apply(o);
+  curAccumulator = accumulatorAdder.apply(curAccumulator, o);
+  while (enumerator.moveNext()) {
+o = enumerator.current();
+TKey curKey = keySelector.apply(o);

Review comment:
   Added a test to test null values. Based on the test, null is put at the 
last position (ASC) by comparator.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [calcite] amaliujia commented on a change in pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

2020-06-24 Thread GitBox


amaliujia commented on a change in pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#discussion_r445212926



##
File path: 
linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
##
@@ -817,6 +817,112 @@ public void remove() {
 resultSelector);
   }
 
+  /**
+   * Group keys are sorted already. Key values are compared by using a
+   * specified comparator. Groups the elements of a sequence according to a
+   * specified key selector function and initializing one accumulator at a 
time.
+   * Go over elements sequentially, adding to accumulator each time an element
+   * with the same key is seen. When key changes, creates a result value from 
the
+   * accumulator and then re-initializes the accumulator.
+   */
+  public static  Enumerable 
sortedGroupBy(
+  Enumerable enumerable,
+  Function1 keySelector,
+  Function0 accumulatorInitializer,
+  Function2 accumulatorAdder,
+  final Function2 resultSelector,
+  final Comparator comparator) {
+return new AbstractEnumerable() {
+  public Enumerator enumerator() {
+return new SortedAggregateEnumerator(
+  enumerable, keySelector, accumulatorInitializer,
+  accumulatorAdder, resultSelector, comparator);
+  }
+};
+  }
+
+  private static class SortedAggregateEnumerator
+  implements Enumerator {
+private final Enumerable enumerable;
+private final Function1 keySelector;
+private final Function0 accumulatorInitializer;
+private final Function2 
accumulatorAdder;
+private final Function2 resultSelector;
+private final Comparator comparator;
+private boolean isInitialized;
+private TAccumulate curAccumulator;
+private Enumerator enumerator;
+
+SortedAggregateEnumerator(
+Enumerable enumerable,
+Function1 keySelector,
+Function0 accumulatorInitializer,
+Function2 accumulatorAdder,
+final Function2 resultSelector,
+final Comparator comparator) {
+  this.enumerable = enumerable;
+  this.keySelector = keySelector;
+  this.accumulatorInitializer = accumulatorInitializer;
+  this.accumulatorAdder = accumulatorAdder;
+  this.resultSelector = resultSelector;
+  this.comparator = comparator;
+  isInitialized = false;
+  curAccumulator = null;
+  enumerator = enumerable.enumerator();
+}
+
+@Override public TResult current() {

Review comment:
   Thanks for the example!
   
   I doubled check the contract and have the following conclusion:
   ```
  * After an enumerator is created or after the {@link #reset} method is
  * called, the {@link #moveNext} method must be called to advance the
  * enumerator to the first element of the collection before reading the
  * value of the {@code current} property; otherwise, {@code current} is
  * undefined.
   ```
   So in this case, return `null` is right as the result is undefined.
   
   ```
  * This method also throws {@link java.util.NoSuchElementException} if
  * the last call to {@code moveNext} returned {@code false}, which 
indicates
  * the end of the collection.
   ```
   I updated the code to make sure it throws `NoSuchElementException` in this 
case.
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [calcite] amaliujia commented on a change in pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

2020-06-23 Thread GitBox


amaliujia commented on a change in pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#discussion_r444658998



##
File path: 
core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableSortedAggregate.java
##
@@ -90,6 +101,133 @@ public EnumerableSortedAggregate(
   }
 
   public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
-throw Util.needToImplement("EnumerableSortedAggregate");
+if (getGroupType() != Group.SIMPLE

Review comment:
   That'a good point. In fact, I think I should follow 
https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableSortedAggregateRule.java#L41





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [calcite] amaliujia commented on a change in pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

2020-06-23 Thread GitBox


amaliujia commented on a change in pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#discussion_r444658637



##
File path: 
linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
##
@@ -817,6 +817,112 @@ public void remove() {
 resultSelector);
   }
 
+  /**
+   * Group keys are sorted already. Key values are compared by using a
+   * specified comparator. Groups the elements of a sequence according to a
+   * specified key selector function and initializing one accumulator at a 
time.
+   * Go over elements sequentially, adding to accumulator each time an element
+   * with the same key is seen. When key changes, creates a result value from 
the
+   * accumulator and then re-initializes the accumulator.
+   */
+  public static  Enumerable 
sortedGroupBy(
+  Enumerable enumerable,
+  Function1 keySelector,
+  Function0 accumulatorInitializer,
+  Function2 accumulatorAdder,
+  final Function2 resultSelector,
+  final Comparator comparator) {
+return new AbstractEnumerable() {
+  public Enumerator enumerator() {
+return new SortedAggregateEnumerator(
+  enumerable, keySelector, accumulatorInitializer,
+  accumulatorAdder, resultSelector, comparator);
+  }
+};
+  }
+
+  private static class SortedAggregateEnumerator
+  implements Enumerator {
+private final Enumerable enumerable;
+private final Function1 keySelector;
+private final Function0 accumulatorInitializer;
+private final Function2 
accumulatorAdder;
+private final Function2 resultSelector;
+private final Comparator comparator;
+private boolean isInitialized;
+private TAccumulate curAccumulator;
+private Enumerator enumerator;
+
+SortedAggregateEnumerator(
+Enumerable enumerable,
+Function1 keySelector,
+Function0 accumulatorInitializer,
+Function2 accumulatorAdder,
+final Function2 resultSelector,
+final Comparator comparator) {
+  this.enumerable = enumerable;
+  this.keySelector = keySelector;
+  this.accumulatorInitializer = accumulatorInitializer;
+  this.accumulatorAdder = accumulatorAdder;
+  this.resultSelector = resultSelector;
+  this.comparator = comparator;
+  isInitialized = false;
+  curAccumulator = null;
+  enumerator = enumerable.enumerator();
+}
+
+@Override public TResult current() {
+  if (curAccumulator == null) {
+curAccumulator = accumulatorInitializer.apply();
+  }
+  TResult result = null;
+  TSource o = enumerator.current();
+  TKey prevKey = keySelector.apply(o);
+  curAccumulator = accumulatorAdder.apply(curAccumulator, o);
+  while (enumerator.moveNext()) {
+o = enumerator.current();
+TKey curKey = keySelector.apply(o);

Review comment:
   This is a good question. 
   
   If we allow `null`, then comparator should give a consistent ordering for 
null (e.g. https://stackoverflow.com/a/2401629/10055573)
   
   If we don't allow `null`, indeed we should have a check somewhere.
   
   However, the problem is I don't know if all those comparator give a 
consistent ordering for null. 
   
   Also 
   ```
   // mergeJoin assumes inputs sorted in ascending order with nulls 
last,
   // if we reach a null key, we are done.
   ```
   
   I actually not sure what should be the right resolution. What do you think?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [calcite] amaliujia commented on a change in pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

2020-06-23 Thread GitBox


amaliujia commented on a change in pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#discussion_r444657447



##
File path: 
linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
##
@@ -817,6 +817,112 @@ public void remove() {
 resultSelector);
   }
 
+  /**
+   * Group keys are sorted already. Key values are compared by using a
+   * specified comparator. Groups the elements of a sequence according to a
+   * specified key selector function and initializing one accumulator at a 
time.
+   * Go over elements sequentially, adding to accumulator each time an element
+   * with the same key is seen. When key changes, creates a result value from 
the
+   * accumulator and then re-initializes the accumulator.
+   */
+  public static  Enumerable 
sortedGroupBy(
+  Enumerable enumerable,
+  Function1 keySelector,
+  Function0 accumulatorInitializer,
+  Function2 accumulatorAdder,
+  final Function2 resultSelector,
+  final Comparator comparator) {
+return new AbstractEnumerable() {
+  public Enumerator enumerator() {
+return new SortedAggregateEnumerator(
+  enumerable, keySelector, accumulatorInitializer,
+  accumulatorAdder, resultSelector, comparator);
+  }
+};
+  }
+
+  private static class SortedAggregateEnumerator
+  implements Enumerator {
+private final Enumerable enumerable;
+private final Function1 keySelector;
+private final Function0 accumulatorInitializer;
+private final Function2 
accumulatorAdder;
+private final Function2 resultSelector;
+private final Comparator comparator;
+private boolean isInitialized;
+private TAccumulate curAccumulator;
+private Enumerator enumerator;
+
+SortedAggregateEnumerator(
+Enumerable enumerable,
+Function1 keySelector,
+Function0 accumulatorInitializer,
+Function2 accumulatorAdder,
+final Function2 resultSelector,
+final Comparator comparator) {
+  this.enumerable = enumerable;
+  this.keySelector = keySelector;
+  this.accumulatorInitializer = accumulatorInitializer;
+  this.accumulatorAdder = accumulatorAdder;
+  this.resultSelector = resultSelector;
+  this.comparator = comparator;
+  isInitialized = false;
+  curAccumulator = null;
+  enumerator = enumerable.enumerator();
+}
+
+@Override public TResult current() {

Review comment:
   This is a great point. Indeed `current` should return same object unless 
it moves to next or resets.
   
   I move a bunch of code to `moveNext` and only keep return a cached current 
result in `current` now. 

##
File path: 
linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
##
@@ -817,6 +817,112 @@ public void remove() {
 resultSelector);
   }
 
+  /**
+   * Group keys are sorted already. Key values are compared by using a
+   * specified comparator. Groups the elements of a sequence according to a
+   * specified key selector function and initializing one accumulator at a 
time.
+   * Go over elements sequentially, adding to accumulator each time an element
+   * with the same key is seen. When key changes, creates a result value from 
the
+   * accumulator and then re-initializes the accumulator.
+   */
+  public static  Enumerable 
sortedGroupBy(
+  Enumerable enumerable,
+  Function1 keySelector,
+  Function0 accumulatorInitializer,
+  Function2 accumulatorAdder,
+  final Function2 resultSelector,
+  final Comparator comparator) {
+return new AbstractEnumerable() {
+  public Enumerator enumerator() {
+return new SortedAggregateEnumerator(
+  enumerable, keySelector, accumulatorInitializer,
+  accumulatorAdder, resultSelector, comparator);
+  }
+};
+  }
+
+  private static class SortedAggregateEnumerator
+  implements Enumerator {
+private final Enumerable enumerable;
+private final Function1 keySelector;
+private final Function0 accumulatorInitializer;
+private final Function2 
accumulatorAdder;
+private final Function2 resultSelector;
+private final Comparator comparator;
+private boolean isInitialized;
+private TAccumulate curAccumulator;
+private Enumerator enumerator;
+
+SortedAggregateEnumerator(
+Enumerable enumerable,
+Function1 keySelector,
+Function0 accumulatorInitializer,
+Function2 accumulatorAdder,
+final Function2 resultSelector,
+final Comparator comparator) {
+  this.enumerable = enumerable;
+  this.keySelector = keySelector;
+  this.accumulatorInitializer = accumulatorInitializer;
+  this.accumulatorAdder = accumulatorAdder;
+  this.resultSelector = resultSelector;
+  this.comparator = comparator;
+  isInitialized = false;
+  curAccumulator = null;
+  enumerator = enumerable.enumerator();
+}
+
+@Override 

[GitHub] [calcite] amaliujia commented on a change in pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

2020-06-18 Thread GitBox


amaliujia commented on a change in pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#discussion_r442417140



##
File path: 
core/src/test/java/org/apache/calcite/test/enumerable/EnumerableSortedAggregateTest.java
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.test.enumerable;
+
+import org.apache.calcite.adapter.enumerable.EnumerableRules;
+import org.apache.calcite.adapter.java.ReflectiveSchema;
+import org.apache.calcite.config.CalciteConnectionProperty;
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.runtime.Hook;
+import org.apache.calcite.test.CalciteAssert;
+import org.apache.calcite.test.JdbcTest;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.function.Consumer;
+
+public class EnumerableSortedAggregateTest {
+  @Test void sortedAgg() {

Review comment:
   R @hsyuan any suggestion that what extra tests are useful for 
`EnumerableSortedAggregate`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [calcite] amaliujia commented on a change in pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

2020-06-18 Thread GitBox


amaliujia commented on a change in pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#discussion_r442417140



##
File path: 
core/src/test/java/org/apache/calcite/test/enumerable/EnumerableSortedAggregateTest.java
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.test.enumerable;
+
+import org.apache.calcite.adapter.enumerable.EnumerableRules;
+import org.apache.calcite.adapter.java.ReflectiveSchema;
+import org.apache.calcite.config.CalciteConnectionProperty;
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.plan.RelOptPlanner;
+import org.apache.calcite.runtime.Hook;
+import org.apache.calcite.test.CalciteAssert;
+import org.apache.calcite.test.JdbcTest;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.function.Consumer;
+
+public class EnumerableSortedAggregateTest {
+  @Test void sortedAgg() {

Review comment:
   @hsyuan any suggestion that what extra tests are useful for 
`EnumerableSortedAggregate`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [calcite] amaliujia commented on a change in pull request #2035: [CALCITE-4008] Implement Code generation for EnumerableSortedAggregat…

2020-06-18 Thread GitBox


amaliujia commented on a change in pull request #2035:
URL: https://github.com/apache/calcite/pull/2035#discussion_r442416752



##
File path: 
core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableSortedAggregate.java
##
@@ -90,6 +101,133 @@ public EnumerableSortedAggregate(
   }
 
   public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
-throw Util.needToImplement("EnumerableSortedAggregate");
+if (getGroupType() != Group.SIMPLE
+|| aggCalls.isEmpty()) {
+  throw Util.needToImplement("EnumerableSortedAggregate");
+}
+
+final JavaTypeFactory typeFactory = implementor.getTypeFactory();
+final BlockBuilder builder = new BlockBuilder();
+final EnumerableRel child = (EnumerableRel) getInput();
+final Result result = implementor.visitChild(this, 0, child, pref);
+Expression childExp =
+builder.append(
+"child",
+result.block);
+
+final PhysType physType =
+PhysTypeImpl.of(
+typeFactory, getRowType(), pref.preferCustom());
+
+final PhysType inputPhysType = result.physType;
+
+ParameterExpression parameter =
+Expressions.parameter(inputPhysType.getJavaRowType(), "a0");
+
+final PhysType keyPhysType =
+inputPhysType.project(groupSet.asList(), getGroupType() != 
Group.SIMPLE,
+JavaRowFormat.LIST);
+final int groupCount = getGroupCount();
+
+final List aggs = new ArrayList<>(aggCalls.size());
+for (Ord call : Ord.zip(aggCalls)) {
+  aggs.add(new AggImpState(call.i, call.e, false));
+}
+
+// Function0 accumulatorInitializer =
+// new Function0() {
+// public Object[] apply() {
+// return new Object[] {0, 0};
+// }
+// };
+final List initExpressions = new ArrayList<>();
+final BlockBuilder initBlock = new BlockBuilder();
+
+final List aggStateTypes = createAggStateTypes(
+initExpressions, initBlock, aggs, typeFactory);
+
+final PhysType accPhysType =
+PhysTypeImpl.of(typeFactory,
+typeFactory.createSyntheticType(aggStateTypes));
+
+declareParentAccumulator(initExpressions, initBlock, accPhysType);
+
+final Expression accumulatorInitializer =
+builder.append("accumulatorInitializer",
+Expressions.lambda(
+Function0.class,
+initBlock.toBlock()));
+
+// Function2 accumulatorAdder =
+// new Function2() {
+// public Object[] apply(Object[] acc, Employee in) {
+//  acc[0] = ((Integer) acc[0]) + 1;
+//  acc[1] = ((Integer) acc[1]) + in.salary;
+// return acc;
+// }
+// };
+final ParameterExpression inParameter =
+Expressions.parameter(inputPhysType.getJavaRowType(), "in");
+final ParameterExpression acc_ =
+Expressions.parameter(accPhysType.getJavaRowType(), "acc");
+
+createAccumulatorAdders(
+inParameter, aggs, accPhysType, acc_, inputPhysType, builder, 
implementor, typeFactory);
+
+final ParameterExpression lambdaFactory =
+Expressions.parameter(AggregateLambdaFactory.class,
+builder.newName("lambdaFactory"));
+
+implementLambdaFactory(builder, inputPhysType, aggs, 
accumulatorInitializer,
+false, lambdaFactory);
+
+final BlockBuilder resultBlock = new BlockBuilder();
+final List results = Expressions.list();
+final ParameterExpression key_;
+final Type keyType = keyPhysType.getJavaRowType();
+key_ = Expressions.parameter(keyType, "key");
+for (int j = 0; j < groupCount; j++) {
+  final Expression ref = keyPhysType.fieldReference(key_, j);
+  results.add(ref);
+}
+
+for (final AggImpState agg : aggs) {
+  results.add(
+  agg.implementor.implementResult(agg.context,
+  new AggResultContextImpl(resultBlock, agg.call, agg.state, key_,
+  keyPhysType)));
+}
+resultBlock.add(physType.record(results));
+
+final Expression keySelector_ =
+builder.append("keySelector",
+inputPhysType.generateSelector(parameter,
+groupSet.asList(),
+keyPhysType.getFormat()));
+// Generate the appropriate key Comparator.
+final Expression comparator = 
keyPhysType.generateComparator(getTraitSet().getCollation());

Review comment:
   I will need to double check and probably revise this part to make 
KeySelector match with KeyComparator.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org