[jira] [Commented] (FLINK-5963) Remove preparation mapper of DataSetAggregate

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902857#comment-15902857
 ] 

ASF GitHub Bot commented on FLINK-5963:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3472


> Remove preparation mapper of DataSetAggregate
> -
>
> Key: FLINK-5963
> URL: https://issues.apache.org/jira/browse/FLINK-5963
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Minor
> Fix For: 1.3.0
>
>
> With the new UDAGG interface we do not need the preparation mapper anymore. 
> It adds overhead because 
> - it is another operator
> - it prevents to use {{AggregateFunction.accumulate()}} in a combiner or 
> reducer.
> Hence, it should be removed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5963) Remove preparation mapper of DataSetAggregate

2017-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15902790#comment-15902790
 ] 

ASF GitHub Bot commented on FLINK-5963:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/3472
  
Thanks @fhueske. I will rebase this and replace foreach with while loops, 
and merge it.


> Remove preparation mapper of DataSetAggregate
> -
>
> Key: FLINK-5963
> URL: https://issues.apache.org/jira/browse/FLINK-5963
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Minor
>
> With the new UDAGG interface we do not need the preparation mapper anymore. 
> It adds overhead because 
> - it is another operator
> - it prevents to use {{AggregateFunction.accumulate()}} in a combiner or 
> reducer.
> Hence, it should be removed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5963) Remove preparation mapper of DataSetAggregate

2017-03-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15901518#comment-15901518
 ] 

ASF GitHub Bot commented on FLINK-5963:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3472
  
Thanks for the reviews @shaoxuan-wang and @sunjincheng121.
Will merge this PR


> Remove preparation mapper of DataSetAggregate
> -
>
> Key: FLINK-5963
> URL: https://issues.apache.org/jira/browse/FLINK-5963
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Minor
>
> With the new UDAGG interface we do not need the preparation mapper anymore. 
> It adds overhead because 
> - it is another operator
> - it prevents to use {{AggregateFunction.accumulate()}} in a combiner or 
> reducer.
> Hence, it should be removed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5963) Remove preparation mapper of DataSetAggregate

2017-03-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895927#comment-15895927
 ] 

ASF GitHub Bot commented on FLINK-5963:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3472
  
Hi @sunjincheng121, thanks for your review. I addressed you comments. 
Cheers, Fabian


> Remove preparation mapper of DataSetAggregate
> -
>
> Key: FLINK-5963
> URL: https://issues.apache.org/jira/browse/FLINK-5963
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Minor
>
> With the new UDAGG interface we do not need the preparation mapper anymore. 
> It adds overhead because 
> - it is another operator
> - it prevents to use {{AggregateFunction.accumulate()}} in a combiner or 
> reducer.
> Hence, it should be removed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5963) Remove preparation mapper of DataSetAggregate

2017-03-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895924#comment-15895924
 ] 

ASF GitHub Bot commented on FLINK-5963:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3472#discussion_r104298833
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
 ---
@@ -87,47 +89,67 @@ class DataSetAggregate(
 
   override def translateToPlan(tableEnv: BatchTableEnvironment): 
DataSet[Row] = {
 
-val groupingKeys = grouping.indices.toArray
-
-val mapFunction = AggregateUtil.createPrepareMapFunction(
-  namedAggregates,
-  grouping,
-  inputType)
-
-val groupReduceFunction = 
AggregateUtil.createAggregateGroupReduceFunction(
-  namedAggregates,
-  inputType,
-  rowRelDataType,
-  grouping,
-  inGroupingSet)
+val (preAgg: Option[DataSetPreAggFunction],
+  preAggType: Option[TypeInformation[Row]],
+  finalAgg: GroupReduceFunction[Row, Row]) =
--- End diff --

OK


> Remove preparation mapper of DataSetAggregate
> -
>
> Key: FLINK-5963
> URL: https://issues.apache.org/jira/browse/FLINK-5963
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Minor
>
> With the new UDAGG interface we do not need the preparation mapper anymore. 
> It adds overhead because 
> - it is another operator
> - it prevents to use {{AggregateFunction.accumulate()}} in a combiner or 
> reducer.
> Hence, it should be removed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5963) Remove preparation mapper of DataSetAggregate

2017-03-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895925#comment-15895925
 ] 

ASF GitHub Bot commented on FLINK-5963:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3472#discussion_r104298836
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala
 ---
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions._
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+
+/**
+  * [[GroupCombineFunction]] and [[MapPartitionFunction]] to compute 
pre-aggregates for batch
+  * (DataSet) queries.
+  *
+  * @param aggregates The aggregate functions.
+  * @param aggInFields The positions of the aggregation input fields.
+  * @param groupingKeys The positions of the grouping keys in the input.
+  */
+class DataSetPreAggFunction(
+private val aggregates: Array[AggregateFunction[_ <: Any]],
+private val aggInFields: Array[Int],
+private val groupingKeys: Array[Int])
+  extends AbstractRichFunction
+with GroupCombineFunction[Row, Row]
--- End diff --

OK


> Remove preparation mapper of DataSetAggregate
> -
>
> Key: FLINK-5963
> URL: https://issues.apache.org/jira/browse/FLINK-5963
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Minor
>
> With the new UDAGG interface we do not need the preparation mapper anymore. 
> It adds overhead because 
> - it is another operator
> - it prevents to use {{AggregateFunction.accumulate()}} in a combiner or 
> reducer.
> Hence, it should be removed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5963) Remove preparation mapper of DataSetAggregate

2017-03-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895923#comment-15895923
 ] 

ASF GitHub Bot commented on FLINK-5963:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3472#discussion_r104298822
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+
+/**
+  * [[RichGroupReduceFunction]] to compute aggregates that do not support 
preaggregation for batch
+  * (DataSet) queries.
+  *
+  * @param aggregates The aggregate functions.
+  * @param aggInFields The positions of the aggregation input fields.
+  * @param gkeyOutMapping The mapping of group keys between input and 
output positions.
+  * @param aggOutMapping  The mapping of aggregates to output positions.
+  * @param groupingSetsMapping The mapping of grouping set keys between 
input and output positions.
+  * @param finalRowArity The arity of the final resulting row.
+  */
+class DataSetAggFunction(
+private val aggregates: Array[AggregateFunction[_ <: Any]],
+private val aggInFields: Array[Int],
+private val aggOutMapping: Array[(Int, Int)],
+private val gkeyOutMapping: Array[(Int, Int)],
+private val groupingSetsMapping: Array[(Int, Int)],
+private val finalRowArity: Int)
+  extends RichGroupReduceFunction[Row, Row] {
+
+  private var output: Row = _
+
+  private var intermediateGKeys: Option[Array[Int]] = None
+  private val aggsWithIdx: Array[(AggregateFunction[_], Int)] = 
aggregates.zipWithIndex
+  private var accumulators: Array[Accumulator] = _
+
+  override def open(config: Configuration) {
+Preconditions.checkNotNull(aggregates)
+Preconditions.checkNotNull(aggInFields)
+Preconditions.checkNotNull(aggOutMapping)
+Preconditions.checkNotNull(gkeyOutMapping)
+accumulators = new Array(aggregates.length)
--- End diff --

good point! thanks


> Remove preparation mapper of DataSetAggregate
> -
>
> Key: FLINK-5963
> URL: https://issues.apache.org/jira/browse/FLINK-5963
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Minor
>
> With the new UDAGG interface we do not need the preparation mapper anymore. 
> It adds overhead because 
> - it is another operator
> - it prevents to use {{AggregateFunction.accumulate()}} in a combiner or 
> reducer.
> Hence, it should be removed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5963) Remove preparation mapper of DataSetAggregate

2017-03-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895769#comment-15895769
 ] 

ASF GitHub Bot commented on FLINK-5963:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3472#discussion_r104290767
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
 ---
@@ -87,47 +89,67 @@ class DataSetAggregate(
 
   override def translateToPlan(tableEnv: BatchTableEnvironment): 
DataSet[Row] = {
 
-val groupingKeys = grouping.indices.toArray
-
-val mapFunction = AggregateUtil.createPrepareMapFunction(
-  namedAggregates,
-  grouping,
-  inputType)
-
-val groupReduceFunction = 
AggregateUtil.createAggregateGroupReduceFunction(
-  namedAggregates,
-  inputType,
-  rowRelDataType,
-  grouping,
-  inGroupingSet)
+val (preAgg: Option[DataSetPreAggFunction],
+  preAggType: Option[TypeInformation[Row]],
+  finalAgg: GroupReduceFunction[Row, Row]) =
--- End diff --

Can we format the code like this:

   ```
 val (
  preAgg: Option[DataSetPreAggFunction],
  preAggType: Option[TypeInformation[Row]],
  finalAgg: GroupReduceFunction[Row, Row]) =
```
I am not sure. just a suggestion.


> Remove preparation mapper of DataSetAggregate
> -
>
> Key: FLINK-5963
> URL: https://issues.apache.org/jira/browse/FLINK-5963
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Minor
>
> With the new UDAGG interface we do not need the preparation mapper anymore. 
> It adds overhead because 
> - it is another operator
> - it prevents to use {{AggregateFunction.accumulate()}} in a combiner or 
> reducer.
> Hence, it should be removed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5963) Remove preparation mapper of DataSetAggregate

2017-03-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895770#comment-15895770
 ] 

ASF GitHub Bot commented on FLINK-5963:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3472#discussion_r104290608
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+
+/**
+  * [[RichGroupReduceFunction]] to compute aggregates that do not support 
preaggregation for batch
+  * (DataSet) queries.
+  *
+  * @param aggregates The aggregate functions.
+  * @param aggInFields The positions of the aggregation input fields.
+  * @param gkeyOutMapping The mapping of group keys between input and 
output positions.
+  * @param aggOutMapping  The mapping of aggregates to output positions.
+  * @param groupingSetsMapping The mapping of grouping set keys between 
input and output positions.
+  * @param finalRowArity The arity of the final resulting row.
+  */
+class DataSetAggFunction(
+private val aggregates: Array[AggregateFunction[_ <: Any]],
+private val aggInFields: Array[Int],
+private val aggOutMapping: Array[(Int, Int)],
+private val gkeyOutMapping: Array[(Int, Int)],
+private val groupingSetsMapping: Array[(Int, Int)],
+private val finalRowArity: Int)
+  extends RichGroupReduceFunction[Row, Row] {
+
+  private var output: Row = _
+
+  private var intermediateGKeys: Option[Array[Int]] = None
+  private val aggsWithIdx: Array[(AggregateFunction[_], Int)] = 
aggregates.zipWithIndex
+  private var accumulators: Array[Accumulator] = _
+
+  override def open(config: Configuration) {
+Preconditions.checkNotNull(aggregates)
+Preconditions.checkNotNull(aggInFields)
+Preconditions.checkNotNull(aggOutMapping)
+Preconditions.checkNotNull(gkeyOutMapping)
+accumulators = new Array(aggregates.length)
--- End diff --

Can we move those checks  into  the constructor ?



> Remove preparation mapper of DataSetAggregate
> -
>
> Key: FLINK-5963
> URL: https://issues.apache.org/jira/browse/FLINK-5963
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Minor
>
> With the new UDAGG interface we do not need the preparation mapper anymore. 
> It adds overhead because 
> - it is another operator
> - it prevents to use {{AggregateFunction.accumulate()}} in a combiner or 
> reducer.
> Hence, it should be removed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5963) Remove preparation mapper of DataSetAggregate

2017-03-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895771#comment-15895771
 ] 

ASF GitHub Bot commented on FLINK-5963:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3472#discussion_r104292001
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala
 ---
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions._
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+
+/**
+  * [[GroupCombineFunction]] and [[MapPartitionFunction]] to compute 
pre-aggregates for batch
+  * (DataSet) queries.
+  *
+  * @param aggregates The aggregate functions.
+  * @param aggInFields The positions of the aggregation input fields.
+  * @param groupingKeys The positions of the grouping keys in the input.
+  */
+class DataSetPreAggFunction(
+private val aggregates: Array[AggregateFunction[_ <: Any]],
+private val aggInFields: Array[Int],
+private val groupingKeys: Array[Int])
+  extends AbstractRichFunction
+with GroupCombineFunction[Row, Row]
--- End diff --

Can we align with and extend? 
The suggestion just ref to: [Scalar Class/Object/Trait constructors 
declarations ](http://docs.scala-lang.org/style/declarations)


> Remove preparation mapper of DataSetAggregate
> -
>
> Key: FLINK-5963
> URL: https://issues.apache.org/jira/browse/FLINK-5963
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Minor
>
> With the new UDAGG interface we do not need the preparation mapper anymore. 
> It adds overhead because 
> - it is another operator
> - it prevents to use {{AggregateFunction.accumulate()}} in a combiner or 
> reducer.
> Hence, it should be removed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5963) Remove preparation mapper of DataSetAggregate

2017-03-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895561#comment-15895561
 ] 

ASF GitHub Bot commented on FLINK-5963:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3472#discussion_r104282353
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
 ---
@@ -87,47 +89,67 @@ class DataSetAggregate(
 
   override def translateToPlan(tableEnv: BatchTableEnvironment): 
DataSet[Row] = {
 
-val groupingKeys = grouping.indices.toArray
-
-val mapFunction = AggregateUtil.createPrepareMapFunction(
-  namedAggregates,
-  grouping,
-  inputType)
-
-val groupReduceFunction = 
AggregateUtil.createAggregateGroupReduceFunction(
-  namedAggregates,
-  inputType,
-  rowRelDataType,
-  grouping,
-  inGroupingSet)
+val (preAgg: Option[DataSetPreAggFunction],
+  preAggType: Option[TypeInformation[Row]],
+  finalAgg: GroupReduceFunction[Row, Row]) =
+  AggregateUtil.createDataSetAggregateFunctions(
+namedAggregates,
+inputType,
+rowRelDataType,
+grouping,
+inGroupingSet)
 
 val inputDS = 
getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
 
 val aggString = aggregationToString(inputType, grouping, getRowType, 
namedAggregates, Nil)
-val prepareOpName = s"prepare select: ($aggString)"
-val mappedInput = inputDS.map(mapFunction).name(prepareOpName)
 
 val rowTypeInfo = 
FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
 
-if (groupingKeys.length > 0) {
+if (grouping.length > 0) {
   // grouped aggregation
   val aggOpName = s"groupBy: (${groupingToString(inputType, 
grouping)}), " +
 s"select: ($aggString)"
 
-  mappedInput.asInstanceOf[DataSet[Row]]
-.groupBy(groupingKeys: _*)
-.reduceGroup(groupReduceFunction)
-.returns(rowTypeInfo)
-.name(aggOpName)
+  if (preAgg.isDefined) {
+inputDS
+  // pre-aggregation
+  .groupBy(grouping: _*)
+  .combineGroup(preAgg.get)
+  .returns(preAggType.get)
+  .name(aggOpName)
+  // final aggregation
+  .groupBy(grouping.indices: _*)
--- End diff --

The pre-aggregation function modifies the schema of rows. It puts all 
grouping keys first, followed by all accumulators. Therefore, the following 
final aggregation needs to group on the first `n` fields.
Before, the prepare mapper changed the layout.


> Remove preparation mapper of DataSetAggregate
> -
>
> Key: FLINK-5963
> URL: https://issues.apache.org/jira/browse/FLINK-5963
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Minor
>
> With the new UDAGG interface we do not need the preparation mapper anymore. 
> It adds overhead because 
> - it is another operator
> - it prevents to use {{AggregateFunction.accumulate()}} in a combiner or 
> reducer.
> Hence, it should be removed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5963) Remove preparation mapper of DataSetAggregate

2017-03-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895558#comment-15895558
 ] 

ASF GitHub Bot commented on FLINK-5963:
---

Github user shaoxuan-wang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3472#discussion_r104282131
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
 ---
@@ -87,47 +89,67 @@ class DataSetAggregate(
 
   override def translateToPlan(tableEnv: BatchTableEnvironment): 
DataSet[Row] = {
 
-val groupingKeys = grouping.indices.toArray
-
-val mapFunction = AggregateUtil.createPrepareMapFunction(
-  namedAggregates,
-  grouping,
-  inputType)
-
-val groupReduceFunction = 
AggregateUtil.createAggregateGroupReduceFunction(
-  namedAggregates,
-  inputType,
-  rowRelDataType,
-  grouping,
-  inGroupingSet)
+val (preAgg: Option[DataSetPreAggFunction],
+  preAggType: Option[TypeInformation[Row]],
+  finalAgg: GroupReduceFunction[Row, Row]) =
+  AggregateUtil.createDataSetAggregateFunctions(
+namedAggregates,
+inputType,
+rowRelDataType,
+grouping,
+inGroupingSet)
 
 val inputDS = 
getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
 
 val aggString = aggregationToString(inputType, grouping, getRowType, 
namedAggregates, Nil)
-val prepareOpName = s"prepare select: ($aggString)"
-val mappedInput = inputDS.map(mapFunction).name(prepareOpName)
 
 val rowTypeInfo = 
FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
 
-if (groupingKeys.length > 0) {
+if (grouping.length > 0) {
   // grouped aggregation
   val aggOpName = s"groupBy: (${groupingToString(inputType, 
grouping)}), " +
 s"select: ($aggString)"
 
-  mappedInput.asInstanceOf[DataSet[Row]]
-.groupBy(groupingKeys: _*)
-.reduceGroup(groupReduceFunction)
-.returns(rowTypeInfo)
-.name(aggOpName)
+  if (preAgg.isDefined) {
+inputDS
+  // pre-aggregation
+  .groupBy(grouping: _*)
+  .combineGroup(preAgg.get)
+  .returns(preAggType.get)
+  .name(aggOpName)
+  // final aggregation
+  .groupBy(grouping.indices: _*)
--- End diff --

I played around this PR with different test modes, everything works very 
well. Just curious why you use grouping.indices as the grouping key here?


> Remove preparation mapper of DataSetAggregate
> -
>
> Key: FLINK-5963
> URL: https://issues.apache.org/jira/browse/FLINK-5963
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Minor
>
> With the new UDAGG interface we do not need the preparation mapper anymore. 
> It adds overhead because 
> - it is another operator
> - it prevents to use {{AggregateFunction.accumulate()}} in a combiner or 
> reducer.
> Hence, it should be removed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5963) Remove preparation mapper of DataSetAggregate

2017-03-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895178#comment-15895178
 ] 

ASF GitHub Bot commented on FLINK-5963:
---

GitHub user fhueske opened a pull request:

https://github.com/apache/flink/pull/3472

[FLINK-5963] [table] Remove prepare mapper of DataSetAggregate.

We remove the preparation mapper to 
- save one operator
- be able to apply `AggregateFunction.accumulate()` in a 
`GroupCombineFunction` or `MapPartitionFunction` for pre-aggregation or in a 
`GroupReduceFunction` for final aggregation.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fhueske/flink tableRmPrepareMap

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3472.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3472






> Remove preparation mapper of DataSetAggregate
> -
>
> Key: FLINK-5963
> URL: https://issues.apache.org/jira/browse/FLINK-5963
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Minor
>
> With the new UDAGG interface we do not need the preparation mapper anymore. 
> It adds overhead because 
> - it is another operator
> - it prevents to use {{AggregateFunction.accumulate()}} in a combiner or 
> reducer.
> Hence, it should be removed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)