[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV...

2018-03-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5706


---


[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV...

2018-03-20 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5706#discussion_r175825522
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
 ---
@@ -103,6 +106,22 @@ class FlinkLogicalWindowAggregateConverter
 FlinkConventions.LOGICAL,
 "FlinkLogicalWindowAggregateConverter") {
 
+  override def matches(call: RelOptRuleCall): Boolean = {
+val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate]
+
+// we do not support these functions natively
+// they have to be converted using the 
WindowAggregateReduceFunctionsRule
+val supported = 
agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall {
+  // we support AVG
+  case SqlKind.AVG => true
+  // but none of the other AVG agg functions
+  case k if SqlKind.AVG_AGG_FUNCTIONS.contains(k) => false
+  case _ => true
+}
+
+!agg.containsDistinctCall() && supported
--- End diff --

Yes. This was kinda confusing to me, we should clean this up when adding 
DISTINCT support. Thanks for the update @fhueske 


---


[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV...

2018-03-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5706#discussion_r175716765
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
 ---
@@ -103,6 +106,22 @@ class FlinkLogicalWindowAggregateConverter
 FlinkConventions.LOGICAL,
 "FlinkLogicalWindowAggregateConverter") {
 
+  override def matches(call: RelOptRuleCall): Boolean = {
+val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate]
+
+// we do not support these functions natively
+// they have to be converted using the 
WindowAggregateReduceFunctionsRule
+val supported = 
agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall {
+  // we support AVG
+  case SqlKind.AVG => true
+  // but none of the other AVG agg functions
+  case k if SqlKind.AVG_AGG_FUNCTIONS.contains(k) => false
+  case _ => true
+}
+
+!agg.containsDistinctCall() && supported
--- End diff --

Hmm, that's a good point. 
In fact, there won't be any plan with a `DISTINCT` aggregation in a 
`LogicalWindowAggregate` because `LogicalWindowAggregateRule` prevents 
translation of (`Calc(TUMBLE) -> Aggregate()`) into (`WindowAggregate(TUMBLE)`) 
if there is a distinct aggregate. This prevents window aggregates in SQL 
queries being translated into `WindowAggregate`. The Table API does not even 
have an API to define such queries.

So, I'd simply remove the `containsDistinctCall()` check for now. We should 
definitely clean this up when we add support for DISTINCT aggregates.

@walterddr, are you fine with this?



---


[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV...

2018-03-18 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5706#discussion_r175296744
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
 ---
@@ -103,6 +106,22 @@ class FlinkLogicalWindowAggregateConverter
 FlinkConventions.LOGICAL,
 "FlinkLogicalWindowAggregateConverter") {
 
+  override def matches(call: RelOptRuleCall): Boolean = {
+val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate]
+
+// we do not support these functions natively
+// they have to be converted using the 
WindowAggregateReduceFunctionsRule
+val supported = 
agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall {
+  // we support AVG
+  case SqlKind.AVG => true
+  // but none of the other AVG agg functions
+  case k if SqlKind.AVG_AGG_FUNCTIONS.contains(k) => false
+  case _ => true
+}
+
+!agg.containsDistinctCall() && supported
--- End diff --

shouldn't the logical rule supports distinct call here? It seems like 
previously the error were thrown on the `DataSetWindowAggregateRule` and 
`DataStreamWindowAggregateRule` respectively. Any chance we can add a unit-test 
to further clarify this change?



---


[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV...

2018-03-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5706#discussion_r175230116
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
 ---
@@ -103,6 +106,19 @@ class FlinkLogicalWindowAggregateConverter
 FlinkConventions.LOGICAL,
 "FlinkLogicalWindowAggregateConverter") {
 
+  override def matches(call: RelOptRuleCall): Boolean = {
+val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate]
+
+// we do not support these functions natively
+// they have to be converted using the 
WindowAggregateReduceFunctionsRule
+val supported = 
agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall {
+  case SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | 
SqlKind.VAR_SAMP => false
--- End diff --

sounds good to me. Will update the PR


---


[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV...

2018-03-16 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5706#discussion_r175183811
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
 ---
@@ -103,6 +106,19 @@ class FlinkLogicalWindowAggregateConverter
 FlinkConventions.LOGICAL,
 "FlinkLogicalWindowAggregateConverter") {
 
+  override def matches(call: RelOptRuleCall): Boolean = {
+val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate]
+
+// we do not support these functions natively
+// they have to be converted using the 
WindowAggregateReduceFunctionsRule
+val supported = 
agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall {
+  case SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | 
SqlKind.VAR_SAMP => false
--- End diff --

How about SqlKink.AVG_AGG_FUNCTIONS.contains(kind) && kind != SqlKind.SUM 
&& kind != SqlKind.AVG?


---


[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV...

2018-03-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5706#discussion_r175181100
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
 ---
@@ -103,6 +106,19 @@ class FlinkLogicalWindowAggregateConverter
 FlinkConventions.LOGICAL,
 "FlinkLogicalWindowAggregateConverter") {
 
+  override def matches(call: RelOptRuleCall): Boolean = {
+val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate]
+
+// we do not support these functions natively
+// they have to be converted using the 
WindowAggregateReduceFunctionsRule
+val supported = 
agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall {
+  case SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | 
SqlKind.VAR_SAMP => false
--- End diff --

Replacing the current code by `SqlKind.AVG_AGG_FUNCTIONS.contains()` lead 
to several test failures. These tests expected an `AVG` aggregation function 
that was now replaced by `SUM / COUNT`.


---


[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV...

2018-03-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5706#discussion_r175056688
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
 ---
@@ -103,6 +106,19 @@ class FlinkLogicalWindowAggregateConverter
 FlinkConventions.LOGICAL,
 "FlinkLogicalWindowAggregateConverter") {
 
+  override def matches(call: RelOptRuleCall): Boolean = {
+val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate]
+
+// we do not support these functions natively
+// they have to be converted using the 
WindowAggregateReduceFunctionsRule
+val supported = 
agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall {
+  case SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | 
SqlKind.VAR_SAMP => false
--- End diff --

We have a built-in function for AVG (which we don't really need anymore) 
and SUM, so we could translate such plans.
But I agree, using SqlKind.AVG_AGG_FUNCTIONS.contains() is better.


---


[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV...

2018-03-16 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5706#discussion_r175055646
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/AggregateReduceFunctionsRule.java
 ---
@@ -0,0 +1,590 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.rules;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeUtil;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.CompositeList;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Util;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/*
+ * THIS FILE HAS BEEN COPIED FROM THE APACHE CALCITE PROJECT TO MAKE IT 
MORE EXTENSIBLE.
--- End diff --

Yes, I agree. I would be much better to have this in code in Calcite. 
However, the changes are very Flink specific (we need to add a few fields 
to the projection).
OTOH its just moving some code in a protected function, so no change in 
functionality and only few lines touched.

I'll create a JIRA in Calcite and reference the issue. In case, Calcite 
does not want the change, we can keep the class in Flink.


---