[jira] [Commented] (FLINK-5963) Remove preparation mapper of DataSetAggregate
[ https://issues.apache.org/jira/browse/FLINK-5963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902857#comment-15902857 ] ASF GitHub Bot commented on FLINK-5963: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3472 > Remove preparation mapper of DataSetAggregate > - > > Key: FLINK-5963 > URL: https://issues.apache.org/jira/browse/FLINK-5963 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Minor > Fix For: 1.3.0 > > > With the new UDAGG interface we do not need the preparation mapper anymore. > It adds overhead because > - it is another operator > - it prevents to use {{AggregateFunction.accumulate()}} in a combiner or > reducer. > Hence, it should be removed. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5963) Remove preparation mapper of DataSetAggregate
[ https://issues.apache.org/jira/browse/FLINK-5963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902790#comment-15902790 ] ASF GitHub Bot commented on FLINK-5963: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/3472 Thanks @fhueske. I will rebase this and replace foreach with while loops, and merge it. > Remove preparation mapper of DataSetAggregate > - > > Key: FLINK-5963 > URL: https://issues.apache.org/jira/browse/FLINK-5963 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Minor > > With the new UDAGG interface we do not need the preparation mapper anymore. > It adds overhead because > - it is another operator > - it prevents to use {{AggregateFunction.accumulate()}} in a combiner or > reducer. > Hence, it should be removed. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5963) Remove preparation mapper of DataSetAggregate
[ https://issues.apache.org/jira/browse/FLINK-5963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901518#comment-15901518 ] ASF GitHub Bot commented on FLINK-5963: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3472 Thanks for the reviews @shaoxuan-wang and @sunjincheng121. Will merge this PR > Remove preparation mapper of DataSetAggregate > - > > Key: FLINK-5963 > URL: https://issues.apache.org/jira/browse/FLINK-5963 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Minor > > With the new UDAGG interface we do not need the preparation mapper anymore. > It adds overhead because > - it is another operator > - it prevents to use {{AggregateFunction.accumulate()}} in a combiner or > reducer. > Hence, it should be removed. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5963) Remove preparation mapper of DataSetAggregate
[ https://issues.apache.org/jira/browse/FLINK-5963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895927#comment-15895927 ] ASF GitHub Bot commented on FLINK-5963: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3472 Hi @sunjincheng121, thanks for your review. I addressed you comments. Cheers, Fabian > Remove preparation mapper of DataSetAggregate > - > > Key: FLINK-5963 > URL: https://issues.apache.org/jira/browse/FLINK-5963 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Minor > > With the new UDAGG interface we do not need the preparation mapper anymore. > It adds overhead because > - it is another operator > - it prevents to use {{AggregateFunction.accumulate()}} in a combiner or > reducer. > Hence, it should be removed. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5963) Remove preparation mapper of DataSetAggregate
[ https://issues.apache.org/jira/browse/FLINK-5963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895924#comment-15895924 ] ASF GitHub Bot commented on FLINK-5963: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3472#discussion_r104298833 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala --- @@ -87,47 +89,67 @@ class DataSetAggregate( override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = { -val groupingKeys = grouping.indices.toArray - -val mapFunction = AggregateUtil.createPrepareMapFunction( - namedAggregates, - grouping, - inputType) - -val groupReduceFunction = AggregateUtil.createAggregateGroupReduceFunction( - namedAggregates, - inputType, - rowRelDataType, - grouping, - inGroupingSet) +val (preAgg: Option[DataSetPreAggFunction], + preAggType: Option[TypeInformation[Row]], + finalAgg: GroupReduceFunction[Row, Row]) = --- End diff -- OK > Remove preparation mapper of DataSetAggregate > - > > Key: FLINK-5963 > URL: https://issues.apache.org/jira/browse/FLINK-5963 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Minor > > With the new UDAGG interface we do not need the preparation mapper anymore. > It adds overhead because > - it is another operator > - it prevents to use {{AggregateFunction.accumulate()}} in a combiner or > reducer. > Hence, it should be removed. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5963) Remove preparation mapper of DataSetAggregate
[ https://issues.apache.org/jira/browse/FLINK-5963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895925#comment-15895925 ] ASF GitHub Bot commented on FLINK-5963: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3472#discussion_r104298836 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala --- @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang.Iterable + +import org.apache.flink.api.common.functions._ +import org.apache.flink.configuration.Configuration +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} +import org.apache.flink.types.Row +import org.apache.flink.util.{Collector, Preconditions} + +/** + * [[GroupCombineFunction]] and [[MapPartitionFunction]] to compute pre-aggregates for batch + * (DataSet) queries. + * + * @param aggregates The aggregate functions. + * @param aggInFields The positions of the aggregation input fields. + * @param groupingKeys The positions of the grouping keys in the input. + */ +class DataSetPreAggFunction( +private val aggregates: Array[AggregateFunction[_ <: Any]], +private val aggInFields: Array[Int], +private val groupingKeys: Array[Int]) + extends AbstractRichFunction +with GroupCombineFunction[Row, Row] --- End diff -- OK > Remove preparation mapper of DataSetAggregate > - > > Key: FLINK-5963 > URL: https://issues.apache.org/jira/browse/FLINK-5963 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Minor > > With the new UDAGG interface we do not need the preparation mapper anymore. > It adds overhead because > - it is another operator > - it prevents to use {{AggregateFunction.accumulate()}} in a combiner or > reducer. > Hence, it should be removed. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5963) Remove preparation mapper of DataSetAggregate
[ https://issues.apache.org/jira/browse/FLINK-5963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895923#comment-15895923 ] ASF GitHub Bot commented on FLINK-5963: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3472#discussion_r104298822 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang.Iterable + +import org.apache.flink.api.common.functions.RichGroupReduceFunction +import org.apache.flink.configuration.Configuration +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} +import org.apache.flink.types.Row +import org.apache.flink.util.{Collector, Preconditions} + +/** + * [[RichGroupReduceFunction]] to compute aggregates that do not support preaggregation for batch + * (DataSet) queries. + * + * @param aggregates The aggregate functions. + * @param aggInFields The positions of the aggregation input fields. + * @param gkeyOutMapping The mapping of group keys between input and output positions. + * @param aggOutMapping The mapping of aggregates to output positions. + * @param groupingSetsMapping The mapping of grouping set keys between input and output positions. + * @param finalRowArity The arity of the final resulting row. + */ +class DataSetAggFunction( +private val aggregates: Array[AggregateFunction[_ <: Any]], +private val aggInFields: Array[Int], +private val aggOutMapping: Array[(Int, Int)], +private val gkeyOutMapping: Array[(Int, Int)], +private val groupingSetsMapping: Array[(Int, Int)], +private val finalRowArity: Int) + extends RichGroupReduceFunction[Row, Row] { + + private var output: Row = _ + + private var intermediateGKeys: Option[Array[Int]] = None + private val aggsWithIdx: Array[(AggregateFunction[_], Int)] = aggregates.zipWithIndex + private var accumulators: Array[Accumulator] = _ + + override def open(config: Configuration) { +Preconditions.checkNotNull(aggregates) +Preconditions.checkNotNull(aggInFields) +Preconditions.checkNotNull(aggOutMapping) +Preconditions.checkNotNull(gkeyOutMapping) +accumulators = new Array(aggregates.length) --- End diff -- good point! thanks > Remove preparation mapper of DataSetAggregate > - > > Key: FLINK-5963 > URL: https://issues.apache.org/jira/browse/FLINK-5963 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Minor > > With the new UDAGG interface we do not need the preparation mapper anymore. > It adds overhead because > - it is another operator > - it prevents to use {{AggregateFunction.accumulate()}} in a combiner or > reducer. > Hence, it should be removed. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5963) Remove preparation mapper of DataSetAggregate
[ https://issues.apache.org/jira/browse/FLINK-5963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895769#comment-15895769 ] ASF GitHub Bot commented on FLINK-5963: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3472#discussion_r104290767 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala --- @@ -87,47 +89,67 @@ class DataSetAggregate( override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = { -val groupingKeys = grouping.indices.toArray - -val mapFunction = AggregateUtil.createPrepareMapFunction( - namedAggregates, - grouping, - inputType) - -val groupReduceFunction = AggregateUtil.createAggregateGroupReduceFunction( - namedAggregates, - inputType, - rowRelDataType, - grouping, - inGroupingSet) +val (preAgg: Option[DataSetPreAggFunction], + preAggType: Option[TypeInformation[Row]], + finalAgg: GroupReduceFunction[Row, Row]) = --- End diff -- Can we format the code like this: ``` val ( preAgg: Option[DataSetPreAggFunction], preAggType: Option[TypeInformation[Row]], finalAgg: GroupReduceFunction[Row, Row]) = ``` I am not sure. just a suggestion. > Remove preparation mapper of DataSetAggregate > - > > Key: FLINK-5963 > URL: https://issues.apache.org/jira/browse/FLINK-5963 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Minor > > With the new UDAGG interface we do not need the preparation mapper anymore. > It adds overhead because > - it is another operator > - it prevents to use {{AggregateFunction.accumulate()}} in a combiner or > reducer. > Hence, it should be removed. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5963) Remove preparation mapper of DataSetAggregate
[ https://issues.apache.org/jira/browse/FLINK-5963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895770#comment-15895770 ] ASF GitHub Bot commented on FLINK-5963: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3472#discussion_r104290608 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang.Iterable + +import org.apache.flink.api.common.functions.RichGroupReduceFunction +import org.apache.flink.configuration.Configuration +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} +import org.apache.flink.types.Row +import org.apache.flink.util.{Collector, Preconditions} + +/** + * [[RichGroupReduceFunction]] to compute aggregates that do not support preaggregation for batch + * (DataSet) queries. + * + * @param aggregates The aggregate functions. + * @param aggInFields The positions of the aggregation input fields. + * @param gkeyOutMapping The mapping of group keys between input and output positions. + * @param aggOutMapping The mapping of aggregates to output positions. + * @param groupingSetsMapping The mapping of grouping set keys between input and output positions. + * @param finalRowArity The arity of the final resulting row. + */ +class DataSetAggFunction( +private val aggregates: Array[AggregateFunction[_ <: Any]], +private val aggInFields: Array[Int], +private val aggOutMapping: Array[(Int, Int)], +private val gkeyOutMapping: Array[(Int, Int)], +private val groupingSetsMapping: Array[(Int, Int)], +private val finalRowArity: Int) + extends RichGroupReduceFunction[Row, Row] { + + private var output: Row = _ + + private var intermediateGKeys: Option[Array[Int]] = None + private val aggsWithIdx: Array[(AggregateFunction[_], Int)] = aggregates.zipWithIndex + private var accumulators: Array[Accumulator] = _ + + override def open(config: Configuration) { +Preconditions.checkNotNull(aggregates) +Preconditions.checkNotNull(aggInFields) +Preconditions.checkNotNull(aggOutMapping) +Preconditions.checkNotNull(gkeyOutMapping) +accumulators = new Array(aggregates.length) --- End diff -- Can we move those checks into the constructor ? > Remove preparation mapper of DataSetAggregate > - > > Key: FLINK-5963 > URL: https://issues.apache.org/jira/browse/FLINK-5963 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Minor > > With the new UDAGG interface we do not need the preparation mapper anymore. > It adds overhead because > - it is another operator > - it prevents to use {{AggregateFunction.accumulate()}} in a combiner or > reducer. > Hence, it should be removed. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5963) Remove preparation mapper of DataSetAggregate
[ https://issues.apache.org/jira/browse/FLINK-5963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895771#comment-15895771 ] ASF GitHub Bot commented on FLINK-5963: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3472#discussion_r104292001 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala --- @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.lang.Iterable + +import org.apache.flink.api.common.functions._ +import org.apache.flink.configuration.Configuration +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} +import org.apache.flink.types.Row +import org.apache.flink.util.{Collector, Preconditions} + +/** + * [[GroupCombineFunction]] and [[MapPartitionFunction]] to compute pre-aggregates for batch + * (DataSet) queries. + * + * @param aggregates The aggregate functions. + * @param aggInFields The positions of the aggregation input fields. + * @param groupingKeys The positions of the grouping keys in the input. + */ +class DataSetPreAggFunction( +private val aggregates: Array[AggregateFunction[_ <: Any]], +private val aggInFields: Array[Int], +private val groupingKeys: Array[Int]) + extends AbstractRichFunction +with GroupCombineFunction[Row, Row] --- End diff -- Can we align with and extend? The suggestion just ref to: [Scalar Class/Object/Trait constructors declarations ](http://docs.scala-lang.org/style/declarations) > Remove preparation mapper of DataSetAggregate > - > > Key: FLINK-5963 > URL: https://issues.apache.org/jira/browse/FLINK-5963 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Minor > > With the new UDAGG interface we do not need the preparation mapper anymore. > It adds overhead because > - it is another operator > - it prevents to use {{AggregateFunction.accumulate()}} in a combiner or > reducer. > Hence, it should be removed. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5963) Remove preparation mapper of DataSetAggregate
[ https://issues.apache.org/jira/browse/FLINK-5963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895561#comment-15895561 ] ASF GitHub Bot commented on FLINK-5963: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3472#discussion_r104282353 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala --- @@ -87,47 +89,67 @@ class DataSetAggregate( override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = { -val groupingKeys = grouping.indices.toArray - -val mapFunction = AggregateUtil.createPrepareMapFunction( - namedAggregates, - grouping, - inputType) - -val groupReduceFunction = AggregateUtil.createAggregateGroupReduceFunction( - namedAggregates, - inputType, - rowRelDataType, - grouping, - inGroupingSet) +val (preAgg: Option[DataSetPreAggFunction], + preAggType: Option[TypeInformation[Row]], + finalAgg: GroupReduceFunction[Row, Row]) = + AggregateUtil.createDataSetAggregateFunctions( +namedAggregates, +inputType, +rowRelDataType, +grouping, +inGroupingSet) val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv) val aggString = aggregationToString(inputType, grouping, getRowType, namedAggregates, Nil) -val prepareOpName = s"prepare select: ($aggString)" -val mappedInput = inputDS.map(mapFunction).name(prepareOpName) val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] -if (groupingKeys.length > 0) { +if (grouping.length > 0) { // grouped aggregation val aggOpName = s"groupBy: (${groupingToString(inputType, grouping)}), " + s"select: ($aggString)" - mappedInput.asInstanceOf[DataSet[Row]] -.groupBy(groupingKeys: _*) -.reduceGroup(groupReduceFunction) -.returns(rowTypeInfo) -.name(aggOpName) + if (preAgg.isDefined) { +inputDS + // pre-aggregation + .groupBy(grouping: _*) + .combineGroup(preAgg.get) + .returns(preAggType.get) + .name(aggOpName) + // final aggregation + .groupBy(grouping.indices: _*) --- End diff -- The pre-aggregation function modifies the schema of rows. It puts all grouping keys first, followed by all accumulators. Therefore, the following final aggregation needs to group on the first `n` fields. Before, the prepare mapper changed the layout. > Remove preparation mapper of DataSetAggregate > - > > Key: FLINK-5963 > URL: https://issues.apache.org/jira/browse/FLINK-5963 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Minor > > With the new UDAGG interface we do not need the preparation mapper anymore. > It adds overhead because > - it is another operator > - it prevents to use {{AggregateFunction.accumulate()}} in a combiner or > reducer. > Hence, it should be removed. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5963) Remove preparation mapper of DataSetAggregate
[ https://issues.apache.org/jira/browse/FLINK-5963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895558#comment-15895558 ] ASF GitHub Bot commented on FLINK-5963: --- Github user shaoxuan-wang commented on a diff in the pull request: https://github.com/apache/flink/pull/3472#discussion_r104282131 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala --- @@ -87,47 +89,67 @@ class DataSetAggregate( override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = { -val groupingKeys = grouping.indices.toArray - -val mapFunction = AggregateUtil.createPrepareMapFunction( - namedAggregates, - grouping, - inputType) - -val groupReduceFunction = AggregateUtil.createAggregateGroupReduceFunction( - namedAggregates, - inputType, - rowRelDataType, - grouping, - inGroupingSet) +val (preAgg: Option[DataSetPreAggFunction], + preAggType: Option[TypeInformation[Row]], + finalAgg: GroupReduceFunction[Row, Row]) = + AggregateUtil.createDataSetAggregateFunctions( +namedAggregates, +inputType, +rowRelDataType, +grouping, +inGroupingSet) val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv) val aggString = aggregationToString(inputType, grouping, getRowType, namedAggregates, Nil) -val prepareOpName = s"prepare select: ($aggString)" -val mappedInput = inputDS.map(mapFunction).name(prepareOpName) val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] -if (groupingKeys.length > 0) { +if (grouping.length > 0) { // grouped aggregation val aggOpName = s"groupBy: (${groupingToString(inputType, grouping)}), " + s"select: ($aggString)" - mappedInput.asInstanceOf[DataSet[Row]] -.groupBy(groupingKeys: _*) -.reduceGroup(groupReduceFunction) -.returns(rowTypeInfo) -.name(aggOpName) + if (preAgg.isDefined) { +inputDS + // pre-aggregation + .groupBy(grouping: _*) + .combineGroup(preAgg.get) + .returns(preAggType.get) + .name(aggOpName) + // final aggregation + .groupBy(grouping.indices: _*) --- End diff -- I played around this PR with different test modes, everything works very well. Just curious why you use grouping.indices as the grouping key here? > Remove preparation mapper of DataSetAggregate > - > > Key: FLINK-5963 > URL: https://issues.apache.org/jira/browse/FLINK-5963 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Minor > > With the new UDAGG interface we do not need the preparation mapper anymore. > It adds overhead because > - it is another operator > - it prevents to use {{AggregateFunction.accumulate()}} in a combiner or > reducer. > Hence, it should be removed. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5963) Remove preparation mapper of DataSetAggregate
[ https://issues.apache.org/jira/browse/FLINK-5963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895178#comment-15895178 ] ASF GitHub Bot commented on FLINK-5963: --- GitHub user fhueske opened a pull request: https://github.com/apache/flink/pull/3472 [FLINK-5963] [table] Remove prepare mapper of DataSetAggregate. We remove the preparation mapper to - save one operator - be able to apply `AggregateFunction.accumulate()` in a `GroupCombineFunction` or `MapPartitionFunction` for pre-aggregation or in a `GroupReduceFunction` for final aggregation. You can merge this pull request into a Git repository by running: $ git pull https://github.com/fhueske/flink tableRmPrepareMap Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3472.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3472 > Remove preparation mapper of DataSetAggregate > - > > Key: FLINK-5963 > URL: https://issues.apache.org/jira/browse/FLINK-5963 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske >Priority: Minor > > With the new UDAGG interface we do not need the preparation mapper anymore. > It adds overhead because > - it is another operator > - it prevents to use {{AggregateFunction.accumulate()}} in a combiner or > reducer. > Hence, it should be removed. -- This message was sent by Atlassian JIRA (v6.3.15#6346)