[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5706 ---
[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5706#discussion_r175825522 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala --- @@ -103,6 +106,22 @@ class FlinkLogicalWindowAggregateConverter FlinkConventions.LOGICAL, "FlinkLogicalWindowAggregateConverter") { + override def matches(call: RelOptRuleCall): Boolean = { +val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate] + +// we do not support these functions natively +// they have to be converted using the WindowAggregateReduceFunctionsRule +val supported = agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall { + // we support AVG + case SqlKind.AVG => true + // but none of the other AVG agg functions + case k if SqlKind.AVG_AGG_FUNCTIONS.contains(k) => false + case _ => true +} + +!agg.containsDistinctCall() && supported --- End diff -- Yes. This was kinda confusing to me, we should clean this up when adding DISTINCT support. Thanks for the update @fhueske ---
[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5706#discussion_r175716765 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala --- @@ -103,6 +106,22 @@ class FlinkLogicalWindowAggregateConverter FlinkConventions.LOGICAL, "FlinkLogicalWindowAggregateConverter") { + override def matches(call: RelOptRuleCall): Boolean = { +val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate] + +// we do not support these functions natively +// they have to be converted using the WindowAggregateReduceFunctionsRule +val supported = agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall { + // we support AVG + case SqlKind.AVG => true + // but none of the other AVG agg functions + case k if SqlKind.AVG_AGG_FUNCTIONS.contains(k) => false + case _ => true +} + +!agg.containsDistinctCall() && supported --- End diff -- Hmm, that's a good point. In fact, there won't be any plan with a `DISTINCT` aggregation in a `LogicalWindowAggregate` because `LogicalWindowAggregateRule` prevents translation of (`Calc(TUMBLE) -> Aggregate()`) into (`WindowAggregate(TUMBLE)`) if there is a distinct aggregate. This prevents window aggregates in SQL queries being translated into `WindowAggregate`. The Table API does not even have an API to define such queries. So, I'd simply remove the `containsDistinctCall()` check for now. We should definitely clean this up when we add support for DISTINCT aggregates. @walterddr, are you fine with this? ---
[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV...
Github user walterddr commented on a diff in the pull request: https://github.com/apache/flink/pull/5706#discussion_r175296744 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala --- @@ -103,6 +106,22 @@ class FlinkLogicalWindowAggregateConverter FlinkConventions.LOGICAL, "FlinkLogicalWindowAggregateConverter") { + override def matches(call: RelOptRuleCall): Boolean = { +val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate] + +// we do not support these functions natively +// they have to be converted using the WindowAggregateReduceFunctionsRule +val supported = agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall { + // we support AVG + case SqlKind.AVG => true + // but none of the other AVG agg functions + case k if SqlKind.AVG_AGG_FUNCTIONS.contains(k) => false + case _ => true +} + +!agg.containsDistinctCall() && supported --- End diff -- shouldn't the logical rule supports distinct call here? It seems like previously the error were thrown on the `DataSetWindowAggregateRule` and `DataStreamWindowAggregateRule` respectively. Any chance we can add a unit-test to further clarify this change? ---
[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5706#discussion_r175230116 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala --- @@ -103,6 +106,19 @@ class FlinkLogicalWindowAggregateConverter FlinkConventions.LOGICAL, "FlinkLogicalWindowAggregateConverter") { + override def matches(call: RelOptRuleCall): Boolean = { +val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate] + +// we do not support these functions natively +// they have to be converted using the WindowAggregateReduceFunctionsRule +val supported = agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall { + case SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | SqlKind.VAR_SAMP => false --- End diff -- sounds good to me. Will update the PR ---
[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV...
Github user suez1224 commented on a diff in the pull request: https://github.com/apache/flink/pull/5706#discussion_r175183811 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala --- @@ -103,6 +106,19 @@ class FlinkLogicalWindowAggregateConverter FlinkConventions.LOGICAL, "FlinkLogicalWindowAggregateConverter") { + override def matches(call: RelOptRuleCall): Boolean = { +val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate] + +// we do not support these functions natively +// they have to be converted using the WindowAggregateReduceFunctionsRule +val supported = agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall { + case SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | SqlKind.VAR_SAMP => false --- End diff -- How about SqlKink.AVG_AGG_FUNCTIONS.contains(kind) && kind != SqlKind.SUM && kind != SqlKind.AVG? ---
[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5706#discussion_r175181100 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala --- @@ -103,6 +106,19 @@ class FlinkLogicalWindowAggregateConverter FlinkConventions.LOGICAL, "FlinkLogicalWindowAggregateConverter") { + override def matches(call: RelOptRuleCall): Boolean = { +val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate] + +// we do not support these functions natively +// they have to be converted using the WindowAggregateReduceFunctionsRule +val supported = agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall { + case SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | SqlKind.VAR_SAMP => false --- End diff -- Replacing the current code by `SqlKind.AVG_AGG_FUNCTIONS.contains()` lead to several test failures. These tests expected an `AVG` aggregation function that was now replaced by `SUM / COUNT`. ---
[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5706#discussion_r175056688 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala --- @@ -103,6 +106,19 @@ class FlinkLogicalWindowAggregateConverter FlinkConventions.LOGICAL, "FlinkLogicalWindowAggregateConverter") { + override def matches(call: RelOptRuleCall): Boolean = { +val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate] + +// we do not support these functions natively +// they have to be converted using the WindowAggregateReduceFunctionsRule +val supported = agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall { + case SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | SqlKind.VAR_SAMP => false --- End diff -- We have a built-in function for AVG (which we don't really need anymore) and SUM, so we could translate such plans. But I agree, using SqlKind.AVG_AGG_FUNCTIONS.contains() is better. ---
[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5706#discussion_r175055646 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/AggregateReduceFunctionsRule.java --- @@ -0,0 +1,590 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.rel.rules; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptRuleOperand; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.rel.logical.LogicalAggregate; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.type.SqlTypeUtil; +import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.tools.RelBuilderFactory; +import org.apache.calcite.util.CompositeList; +import org.apache.calcite.util.ImmutableIntList; +import org.apache.calcite.util.Util; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/* + * THIS FILE HAS BEEN COPIED FROM THE APACHE CALCITE PROJECT TO MAKE IT MORE EXTENSIBLE. --- End diff -- Yes, I agree. I would be much better to have this in code in Calcite. However, the changes are very Flink specific (we need to add a few fields to the projection). OTOH its just moving some code in a protected function, so no change in functionality and only few lines touched. I'll create a JIRA in Calcite and reference the issue. In case, Calcite does not want the change, we can keep the class in Flink. ---