[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15979375#comment-15979375
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3735


> codeGen DataSet Goupingwindow Aggregates
> 
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15979261#comment-15979261
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3735
  
Thanks @shaoxuan-wang!
Merging


> codeGen DataSet Goupingwindow Aggregates
> 
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978886#comment-15978886
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user shaoxuan-wang commented on the issue:

https://github.com/apache/flink/pull/3735
  
@fhueske , your changes look good to me, I left a few comments.


> codeGen DataSet Goupingwindow Aggregates
> 
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15976603#comment-15976603
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3735
  
Hi @shaoxuan-wang, thanks for the PR. The changes look good.
I opened a PR against your PR branch and refactored the 
`CodeGenerator.generateAggregations() method a bit. Among other things, I added 
code-gen for grouping sets.

Let me know what you think.

Best, Fabian


> codeGen DataSet Goupingwindow Aggregates
> 
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15974244#comment-15974244
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3735
  
Hi @shaoxuan-wang, I'm fine with both approaches, single method with 
additional parameters or multiple methods. If you think the multiple methods 
approach is better, let's go for it.

Thanks, Fabian


> codeGen DataSet Goupingwindow Aggregates
> 
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973846#comment-15973846
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user shaoxuan-wang commented on the issue:

https://github.com/apache/flink/pull/3735
  
@fhueske  thanks for your feedback.
Yes, we could keep GeneratedAggregations interface very clean as 
```
abstract class GeneratedAggregations extends Function {
 def setAggregationResults(accumulators: Row, output: Row)
 def setForwardedFields(input: Row, output: Row)
 def accumulate(accumulators: Row, input: Row)
 def retract(accumulators: Row, input: Row)
 def createAccumulators(): Row
 def mergeAccumulatorsPair(a: Row, b: Row): Row
 def resetAccumulator(accumulators: Row)
}
```
But I feel it might be not very good to add more parameters into code 
generate function as caller function will usually have to construct unnecessary 
empty parameters. I think we can break code generate functions into 2-3 
functions (these are just the interface to process code-gen parameters, the 
fundamental implementation of each function will be shared). Let me prototype 
the changes, and we can continue the discussions from there.

Regarding to your other comments. I did not look into the logic of previous 
implementations while just focused on the code-gen. I will take a look and 
optimize them.


> codeGen DataSet Goupingwindow Aggregates
> 
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973424#comment-15973424
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r112002408
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala
 ---
@@ -38,74 +38,58 @@ import org.apache.flink.util.{Collector, Preconditions}
   * it does no final aggregate evaluation. It also includes the logic of
   * [[DataSetSlideTimeWindowAggFlatMapFunction]].
   *
-  * @param aggregates aggregate functions
-  * @param groupingKeysLength number of grouping keys
-  * @param timeFieldPos position of aligned time field
+  * @param genAggregations Code-generated [[GeneratedAggregations]]
+  * @param keysAndAggregatesArity The total arity of keys and aggregates
   * @param windowSize window size of the sliding window
   * @param windowSlide window slide of the sliding window
   * @param returnType return type of this function
   */
 class DataSetSlideTimeWindowAggReduceGroupFunction(
-private val aggregates: Array[AggregateFunction[_ <: Any]],
-private val groupingKeysLength: Int,
-private val timeFieldPos: Int,
+private val genAggregations: GeneratedAggregationsFunction,
+private val keysAndAggregatesArity: Int,
 private val windowSize: Long,
 private val windowSlide: Long,
 @transient private val returnType: TypeInformation[Row])
   extends RichGroupReduceFunction[Row, Row]
   with CombineFunction[Row, Row]
-  with ResultTypeQueryable[Row] {
+  with ResultTypeQueryable[Row]
+  with Compiler[GeneratedAggregations] {
 
-  Preconditions.checkNotNull(aggregates)
+  private val timeFieldPos = returnType.getArity - 1
+  private val intermediateWindowStartPos = keysAndAggregatesArity
 
   protected var intermediateRow: Row = _
-  // add one field to store window start
-  protected val intermediateRowArity: Int = groupingKeysLength + 
aggregates.length + 1
-  protected val accumulatorList: Array[JArrayList[Accumulator]] = 
Array.fill(aggregates.length) {
-new JArrayList[Accumulator](2)
-  }
-  private val intermediateWindowStartPos: Int = intermediateRowArity - 1
+  private var accumulators: Row = _
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+  private var function: GeneratedAggregations = _
 
   override def open(config: Configuration) {
-intermediateRow = new Row(intermediateRowArity)
-
-// init lists with two empty accumulators
-var i = 0
-while (i < aggregates.length) {
-  val accumulator = aggregates(i).createAccumulator()
-  accumulatorList(i).add(accumulator)
-  accumulatorList(i).add(accumulator)
-  i += 1
-}
+LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
+s"Code:\n$genAggregations.code")
+val clazz = compile(
+  getClass.getClassLoader,
+  genAggregations.name,
+  genAggregations.code)
+LOG.debug("Instantiating AggregateHelper.")
+function = clazz.newInstance()
+
+accumulators = function.createAccumulators()
+intermediateRow = function.createOutputRow()
   }
 
   override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = 
{
 
 // reset first accumulator
-var i = 0
-while (i < aggregates.length) {
-  val accumulator = aggregates(i).createAccumulator()
-  accumulatorList(i).set(0, accumulator)
-  i += 1
-}
+function.resetAccumulator(accumulators)
 
 val iterator = records.iterator()
 
 while (iterator.hasNext) {
   val record = iterator.next()
 
   // accumulate
-  i = 0
-  while (i < aggregates.length) {
-// insert received accumulator into acc list
-val newAcc = record.getField(groupingKeysLength + 
i).asInstanceOf[Accumulator]
-accumulatorList(i).set(1, newAcc)
-// merge acc list
-val retAcc = aggregates(i).merge(accumulatorList(i))
-// insert result into acc list
-accumulatorList(i).set(0, retAcc)
-i += 1
-  }
+  function.mergeAccumulatorsPairWithKeyOffset(accumulators, record)
 
   // trigger tumbling evaluation
   if (!iterator.hasNext) {
--- End diff --

move this behind the loop


> codeGen DataSet Goupingwindow Aggregates
> 
>
> Key: FLINK-6242
> URL: 

[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973421#comment-15973421
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r111999493
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala
 ---
@@ -110,12 +110,8 @@ class DataSetSessionWindowAggregatePreProcessor(
 var windowEnd: java.lang.Long = null
--- End diff --

Move implementation to `combine()` can forward the `mapPartition()` call to 
`combine()`


> codeGen DataSet Goupingwindow Aggregates
> 
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973412#comment-15973412
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r111986866
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -263,33 +263,56 @@ class CodeGenerator(
  aggFields: Array[Array[Int]],
  aggMapping: Array[Int],
  fwdMapping: Array[(Int, Int)],
- outputArity: Int)
+ outputArity: Int,
+ groupingKeys: Array[Int])
   : GeneratedAggregationsFunction = {
 
 def genSetAggregationResults(
   accTypes: Array[String],
   aggs: Array[String],
   aggMapping: Array[Int]): String = {
 
-  val sig: String =
+  val sigHelper: String =
 j"""
-|  public void setAggregationResults(
-|org.apache.flink.types.Row accs,
-|org.apache.flink.types.Row output)""".stripMargin
+   |  private final void setAggregationResultsHelper(
+   |org.apache.flink.types.Row accs,
+   |org.apache.flink.types.Row output,
+   |java.lang.Integer offset)""".stripMargin
 
-  val setAggs: String = {
+  val setAggsHelper: String = {
 for (i <- aggs.indices) yield
   j"""
  |org.apache.flink.table.functions.AggregateFunction 
baseClass$i =
  |  (org.apache.flink.table.functions.AggregateFunction) 
${aggs(i)};
  |
  |output.setField(
- |  ${aggMapping(i)},
+ |  ${aggMapping(i)} + offset,
  |  baseClass$i.getValue((${accTypes(i)}) 
accs.getField($i)));""".stripMargin
   }.mkString("\n")
 
-  j"""$sig {
- |$setAggs
+  val setAggregationResults: String =
+j"""
+   |  public void setAggregationResults(
+   |org.apache.flink.types.Row accs,
+   |org.apache.flink.types.Row output) {
+   |setAggregationResultsHelper(accs, output, 0);
--- End diff --

Code generated methods should be as "flat" as possible. Calling other 
helper methods adds overhead compared to inlining the code.


> codeGen DataSet Goupingwindow Aggregates
> 
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973425#comment-15973425
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r112007816
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
 ---
@@ -36,6 +36,15 @@ abstract class GeneratedAggregations extends Function {
   def setAggregationResults(accumulators: Row, output: Row)
 
   /**
+* Calculates the results from accumulators, and set the results to the 
output (with key offset)
+*
+* @param accumulators the accumulators (saved in a row) which contains 
the current
+* aggregated results
+* @param output   output results collected in a row
+*/
+  def setAggregationResultsWithKeyOffset(accumulators: Row, output: Row)
--- End diff --

Actually, I'm not sure if we really need to implement a different code 
generation function. I had a look at the code generation code and think that we 
could just add a few more parameters to the current code gen method. Right now, 
the behavior of most generated methods can be exactly defined:

- `createAccumulators()`: generates a `Row` with the accumulators for each 
provided `AggregationFunction`. Some methods to `GeneratedAggregations` expect 
a Row of accumulators with exactly this layout as one of their input 
parameters. In the following, this parameter is called `accs`.
- `accumulate(accs, row)`: The `aggFields` parameter controls which fields 
of `row` are accumulated into which accumulator. We should rename this 
parameter to `accFields` though, IMO.
- `retract(accs, row)`: same as for `accumulate`. We should add a separate 
parameter `retractFields: Array[Int]` though.
- `setForwardedFields(input, output)`: The `fwdMapping` parameter controls 
which field of the input row is copied to which position of the output row. We 
could add an optional parameter to copy the `groupSetMapping` to the output as 
well.
- `setAggregationResults(accs, output)`: The `aggMapping` parameter 
controls to which output fields the aggregation results are copied. If we add 
another parameter `partialResults: Boolean`, we can control whether to copy 
final results (`AggregateFunction.getValue()`) or partial results (the 
accumulator).
- `createOutputRow()`: the `outputArity` parameter specfies the arity of 
the output row.
- `mergeAccumulatorsPair(accs, other)`: **This is the only inflexible 
method**. We could change the behavior of the method as follows: The method 
expects as first parameter (`accs`) a Row with the same layout as generated by 
`createAccumulators`. The second parameter can be any row with accumulators at 
arbitrary positions. To enable the merging, we add a parameter `mergeMapping: 
Array[Int]` to the code generating function which defines which fields of the 
`other` parameter are merged with the fields in the `accs` Row. The method 
returns a Row with the default layout (as generated by `createAccumulators()`).
- `resetAccumulator(accs)`: resets a Row of accumulators of the known 
layout.

I haven't checked this thoroughly, but I think with these parameters, we 
can control the generated code sufficiently to support all aggregation 
operators for DataSet and DataStream, i.e., we can generate the currently 
existing functions such that they behave as the more specialized ones that you 
added. Since all code gen parameters (`accFields`, `retractFields`, 
`fwdMapping`, `groupSetMapping`, `aggMapping`, `partialResults`, `outputArity`, 
`mergeMapping`) can be independently set for each type of operator, this should 
give us the flexibility for all types for operators. We only need to 
parameterize the code generation method appropriately. 

In addition, we could make all parameters `Option` and generate empty 
methods if the parameters for a function are not set. (This could also be a 
follow up issue, IMO)

What do you think @shaoxuan-wang ?



> codeGen DataSet Goupingwindow Aggregates
> 
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973423#comment-15973423
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r112002140
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala
 ---
@@ -30,78 +30,46 @@ import org.apache.flink.types.Row
   *
   * It is used for sliding on batch for both time and count-windows.
   *
-  * @param aggregates aggregate functions.
-  * @param groupKeysMapping index mapping of group keys between 
intermediate aggregate Row
-  * and output Row.
-  * @param aggregateMapping index mapping between aggregate function list 
and aggregated value
-  * index in output Row.
-  * @param finalRowArity output row field count
+  * @param genAggregations Code-generated [[GeneratedAggregations]]
+  * @param keysAndAggregatesArity The total arity of keys and aggregates
   * @param finalRowWindowStartPos relative window-start position to last 
field of output row
   * @param finalRowWindowEndPos relative window-end position to last field 
of output row
   * @param windowSize size of the window, used to determine window-end for 
output row
   */
 class DataSetSlideWindowAggReduceCombineFunction(
-aggregates: Array[AggregateFunction[_ <: Any]],
-groupKeysMapping: Array[(Int, Int)],
-aggregateMapping: Array[(Int, Int)],
-finalRowArity: Int,
+genAggregations: GeneratedAggregationsFunction,
+keysAndAggregatesArity: Int,
 finalRowWindowStartPos: Option[Int],
 finalRowWindowEndPos: Option[Int],
 windowSize: Long)
   extends DataSetSlideWindowAggReduceGroupFunction(
-aggregates,
-groupKeysMapping,
-aggregateMapping,
-finalRowArity,
+genAggregations,
+keysAndAggregatesArity,
 finalRowWindowStartPos,
 finalRowWindowEndPos,
 windowSize)
   with CombineFunction[Row, Row] {
 
-  private val intermediateRowArity: Int = groupKeysMapping.length + 
aggregateMapping.length + 1
-  private val intermediateRow: Row = new Row(intermediateRowArity)
+  private val intermediateRow: Row = new Row(keysAndAggregatesArity + 1)
 
   override def combine(records: Iterable[Row]): Row = {
 
-// reset first accumulator
-var i = 0
-while (i < aggregates.length) {
-  aggregates(i).resetAccumulator(accumulatorList(i).get(0))
-  i += 1
-}
+// reset accumulator
+function.resetAccumulator(accumulators)
 
 val iterator = records.iterator()
 while (iterator.hasNext) {
   val record = iterator.next()
--- End diff --

make `record` a `var` and declare it outside of the loop.


> codeGen DataSet Goupingwindow Aggregates
> 
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973418#comment-15973418
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r112001966
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceCombineFunction.scala
 ---
@@ -30,78 +30,46 @@ import org.apache.flink.types.Row
   *
   * It is used for sliding on batch for both time and count-windows.
   *
-  * @param aggregates aggregate functions.
-  * @param groupKeysMapping index mapping of group keys between 
intermediate aggregate Row
-  * and output Row.
-  * @param aggregateMapping index mapping between aggregate function list 
and aggregated value
-  * index in output Row.
-  * @param finalRowArity output row field count
+  * @param genAggregations Code-generated [[GeneratedAggregations]]
+  * @param keysAndAggregatesArity The total arity of keys and aggregates
   * @param finalRowWindowStartPos relative window-start position to last 
field of output row
   * @param finalRowWindowEndPos relative window-end position to last field 
of output row
   * @param windowSize size of the window, used to determine window-end for 
output row
   */
 class DataSetSlideWindowAggReduceCombineFunction(
-aggregates: Array[AggregateFunction[_ <: Any]],
-groupKeysMapping: Array[(Int, Int)],
-aggregateMapping: Array[(Int, Int)],
-finalRowArity: Int,
+genAggregations: GeneratedAggregationsFunction,
+keysAndAggregatesArity: Int,
 finalRowWindowStartPos: Option[Int],
 finalRowWindowEndPos: Option[Int],
 windowSize: Long)
   extends DataSetSlideWindowAggReduceGroupFunction(
-aggregates,
-groupKeysMapping,
-aggregateMapping,
-finalRowArity,
+genAggregations,
+keysAndAggregatesArity,
 finalRowWindowStartPos,
 finalRowWindowEndPos,
 windowSize)
   with CombineFunction[Row, Row] {
 
-  private val intermediateRowArity: Int = groupKeysMapping.length + 
aggregateMapping.length + 1
-  private val intermediateRow: Row = new Row(intermediateRowArity)
+  private val intermediateRow: Row = new Row(keysAndAggregatesArity + 1)
 
   override def combine(records: Iterable[Row]): Row = {
 
-// reset first accumulator
-var i = 0
-while (i < aggregates.length) {
-  aggregates(i).resetAccumulator(accumulatorList(i).get(0))
-  i += 1
-}
+// reset accumulator
+function.resetAccumulator(accumulators)
 
 val iterator = records.iterator()
 while (iterator.hasNext) {
   val record = iterator.next()
 
-  // accumulate
-  i = 0
-  while (i < aggregates.length) {
-// insert received accumulator into acc list
-val newAcc = record.getField(groupKeysMapping.length + 
i).asInstanceOf[Accumulator]
-accumulatorList(i).set(1, newAcc)
-// merge acc list
-val retAcc = aggregates(i).merge(accumulatorList(i))
-// insert result into acc list
-accumulatorList(i).set(0, retAcc)
-i += 1
-  }
+  function.mergeAccumulatorsPairWithKeyOffset(accumulators, record)
 
   // check if this record is the last record
   if (!iterator.hasNext) {
--- End diff --

move this behind the loop to save the check of the condition in the loop 
body.


> codeGen DataSet Goupingwindow Aggregates
> 
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973416#comment-15973416
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r111995370
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala
 ---
@@ -68,23 +72,16 @@ class DataSetAggFunction(
   override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = 
{
 
 // create accumulators
-var i = 0
-while (i < aggregates.length) {
-  accumulators(i) = aggregates(i).createAccumulator()
-  i += 1
-}
+accumulators = function.createAccumulators()
 
 val iterator = records.iterator()
 
 while (iterator.hasNext) {
   val record = iterator.next()
--- End diff --

we can make `record` a `var` and move its definition outside of the loop.
Then we can get rid of the `if (!iterator.hasNext)` check in the body of 
the while loop and set the `output` fields after the loop has terminated. 


> codeGen DataSet Goupingwindow Aggregates
> 
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973426#comment-15973426
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r111991648
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -451,13 +581,34 @@ class CodeGenerator(
   {
 for (i <- accTypes.indices) yield
   j"""
- |accList$i = new java.util.ArrayList<${accTypes(i)}>(2);
+ |accList$i = new java.util.ArrayList<${accTypes(i)}>();
--- End diff --

Why not creating the `ArrayList` with initial capacity 2?


> codeGen DataSet Goupingwindow Aggregates
> 
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973415#comment-15973415
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r112002278
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideTimeWindowAggReduceGroupFunction.scala
 ---
@@ -145,44 +121,23 @@ class DataSetSlideTimeWindowAggReduceGroupFunction(
   override def combine(records: Iterable[Row]): Row = {
 
 // reset first accumulator
-var i = 0
-while (i < aggregates.length) {
-  aggregates(i).resetAccumulator(accumulatorList(i).get(0))
-  i += 1
-}
+function.resetAccumulator(accumulators)
 
 val iterator = records.iterator()
+
 while (iterator.hasNext) {
   val record = iterator.next()
 
-  i = 0
-  while (i < aggregates.length) {
-// insert received accumulator into acc list
-val newAcc = record.getField(groupingKeysLength + 
i).asInstanceOf[Accumulator]
-accumulatorList(i).set(1, newAcc)
-// merge acc list
-val retAcc = aggregates(i).merge(accumulatorList(i))
-// insert result into acc list
-accumulatorList(i).set(0, retAcc)
-i += 1
-  }
+  function.mergeAccumulatorsPairWithKeyOffset(accumulators, record)
 
   // check if this record is the last record
   if (!iterator.hasNext) {
--- End diff --

move this behind the loop


> codeGen DataSet Goupingwindow Aggregates
> 
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973414#comment-15973414
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r111994938
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala
 ---
@@ -64,38 +68,22 @@ class DataSetPreAggFunction(
   def preaggregate(records: Iterable[Row], out: Collector[Row]): Unit = {
 
 // create accumulators
-var i = 0
-while (i < aggregates.length) {
-  accumulators(i) = aggregates(i).createAccumulator()
-  i += 1
-}
+accumulators = function.createAccumulators()
 
 val iterator = records.iterator()
 
 while (iterator.hasNext) {
   val record = iterator.next()
--- End diff --

we can make `record` a `var` and move its definition outside of the loop.
Then we can get rid of the `if (!iterator.hasNext)` check in the body of 
the while loop and set the `output` fields after the loop has terminated. 


> codeGen DataSet Goupingwindow Aggregates
> 
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973427#comment-15973427
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r112003763
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala
 ---
@@ -25,58 +25,56 @@ import 
org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow
-import org.apache.flink.table.functions.AggregateFunction
+import org.apache.flink.table.codegen.{Compiler, 
GeneratedAggregationsFunction}
 import org.apache.flink.types.Row
-import org.apache.flink.util.Preconditions
-
+import org.slf4j.LoggerFactory
 
 /**
   * This map function only works for windows on batch tables.
   * It appends an (aligned) rowtime field to the end of the output row.
+  *
+  * @param genAggregations  Code-generated [[GeneratedAggregations]]
+  * @param timeFieldPos Time field position in input row
+  * @param tumbleTimeWindowSize The size of tumble time window
   */
 class DataSetWindowAggMapFunction(
-private val aggregates: Array[AggregateFunction[_]],
-private val aggFields: Array[Array[Int]],
-private val groupingKeys: Array[Int],
-private val timeFieldPos: Int, // time field position in input row
+private val genAggregations: GeneratedAggregationsFunction,
+private val timeFieldPos: Int,
 private val tumbleTimeWindowSize: Option[Long],
 @transient private val returnType: TypeInformation[Row])
-  extends RichMapFunction[Row, Row] with ResultTypeQueryable[Row] {
-
-  Preconditions.checkNotNull(aggregates)
-  Preconditions.checkNotNull(aggFields)
-  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  extends RichMapFunction[Row, Row]
+with ResultTypeQueryable[Row]
+with Compiler[GeneratedAggregations] {
 
   private var output: Row = _
-  // add one more arity to store rowtime
-  private val partialRowLength = groupingKeys.length + aggregates.length + 
1
-  // rowtime index in the buffer output row
-  private val rowtimeIndex: Int = partialRowLength - 1
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+  private var function: GeneratedAggregations = _
 
   override def open(config: Configuration) {
-output = new Row(partialRowLength)
+LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
+s"Code:\n$genAggregations.code")
+val clazz = compile(
+  getRuntimeContext.getUserCodeClassLoader,
+  genAggregations.name,
+  genAggregations.code)
+LOG.debug("Instantiating AggregateHelper.")
+function = clazz.newInstance()
+
+output = function.createOutputRow()
   }
 
   override def map(input: Row): Row = {
 
-var i = 0
-while (i < aggregates.length) {
-  val agg = aggregates(i)
-  val fieldValue = input.getField(aggFields(i)(0))
-  val accumulator = agg.createAccumulator()
-  agg.accumulate(accumulator, fieldValue)
-  output.setField(groupingKeys.length + i, accumulator)
-  i += 1
-}
+function.createAccumulatorsAndSetToOutput(output)
--- End diff --

create an accumulator with `function.createAccumulator()` once in `open()`, 
reset it here, and copy it to `output` with `function.setAggregationResults()`?


> codeGen DataSet Goupingwindow Aggregates
> 
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973428#comment-15973428
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r112002707
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala
 ---
@@ -18,111 +18,77 @@
 package org.apache.flink.table.runtime.aggregate
 
 import java.lang.Iterable
-import java.util.{ArrayList => JArrayList}
 
 import org.apache.flink.api.common.functions.RichGroupReduceFunction
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.table.codegen.{Compiler, 
GeneratedAggregationsFunction}
 import org.apache.flink.types.Row
-import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.util.Collector
+import org.slf4j.LoggerFactory
 
 /**
   * It wraps the aggregate logic inside of
   * [[org.apache.flink.api.java.operators.GroupReduceOperator]].
   *
   * It is used for sliding on batch for both time and count-windows.
   *
-  * @param aggregates aggregate functions.
-  * @param groupKeysMapping index mapping of group keys between 
intermediate aggregate Row
-  * and output Row.
-  * @param aggregateMapping index mapping between aggregate function list 
and aggregated value
-  * index in output Row.
-  * @param finalRowArity output row field count
+  * @param genAggregations Code-generated [[GeneratedAggregations]]
+  * @param keysAndAggregatesArity The total arity of keys and aggregates
   * @param finalRowWindowStartPos relative window-start position to last 
field of output row
   * @param finalRowWindowEndPos relative window-end position to last field 
of output row
   * @param windowSize size of the window, used to determine window-end for 
output row
   */
 class DataSetSlideWindowAggReduceGroupFunction(
-aggregates: Array[AggregateFunction[_ <: Any]],
-groupKeysMapping: Array[(Int, Int)],
-aggregateMapping: Array[(Int, Int)],
-finalRowArity: Int,
+genAggregations: GeneratedAggregationsFunction,
+keysAndAggregatesArity: Int,
 finalRowWindowStartPos: Option[Int],
 finalRowWindowEndPos: Option[Int],
 windowSize: Long)
-  extends RichGroupReduceFunction[Row, Row] {
-
-  Preconditions.checkNotNull(aggregates)
-  Preconditions.checkNotNull(groupKeysMapping)
+  extends RichGroupReduceFunction[Row, Row]
+with Compiler[GeneratedAggregations] {
 
   private var collector: TimeWindowPropertyCollector = _
+  protected val windowStartPos: Int = keysAndAggregatesArity
+
   private var output: Row = _
-  private val accumulatorStartPos: Int = groupKeysMapping.length
-  protected val windowStartPos: Int = accumulatorStartPos + 
aggregates.length
+  protected var accumulators: Row = _
 
-  val accumulatorList: Array[JArrayList[Accumulator]] = 
Array.fill(aggregates.length) {
-new JArrayList[Accumulator](2)
-  }
+  val LOG = LoggerFactory.getLogger(this.getClass)
+  protected var function: GeneratedAggregations = _
 
   override def open(config: Configuration) {
-output = new Row(finalRowArity)
+LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
+s"Code:\n$genAggregations.code")
+val clazz = compile(
+  getClass.getClassLoader,
+  genAggregations.name,
+  genAggregations.code)
+LOG.debug("Instantiating AggregateHelper.")
+function = clazz.newInstance()
+
+output = function.createOutputRow()
+accumulators = function.createAccumulators()
 collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, 
finalRowWindowEndPos)
-
-// init lists with two empty accumulators
-var i = 0
-while (i < aggregates.length) {
-  val accumulator = aggregates(i).createAccumulator()
-  accumulatorList(i).add(accumulator)
-  accumulatorList(i).add(accumulator)
-  i += 1
-}
   }
 
   override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = 
{
 
-// reset first accumulator
-var i = 0
-while (i < aggregates.length) {
-  aggregates(i).resetAccumulator(accumulatorList(i).get(0))
-  i += 1
-}
+// reset accumulator
+function.resetAccumulator(accumulators)
 
 val iterator = records.iterator()
 while (iterator.hasNext) {
   

[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973417#comment-15973417
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r111993502
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala
 ---
@@ -64,38 +68,22 @@ class DataSetPreAggFunction(
   def preaggregate(records: Iterable[Row], out: Collector[Row]): Unit = {
 
 // create accumulators
-var i = 0
-while (i < aggregates.length) {
-  accumulators(i) = aggregates(i).createAccumulator()
-  i += 1
-}
+accumulators = function.createAccumulators()
--- End diff --

create accumulators once and use `function.resetAccumulators()`?


> codeGen DataSet Goupingwindow Aggregates
> 
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973422#comment-15973422
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r111992686
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala
 ---
@@ -21,44 +21,48 @@ import java.lang.Iterable
 
 import org.apache.flink.api.common.functions.RichGroupReduceFunction
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.table.codegen.{Compiler, 
GeneratedAggregationsFunction}
 import org.apache.flink.types.Row
 import org.apache.flink.util.{Collector, Preconditions}
+import org.slf4j.LoggerFactory
 
 /**
   * [[RichGroupReduceFunction]] to compute aggregates that do not support 
pre-aggregation for batch
   * (DataSet) queries.
   *
-  * @param aggregates The aggregate functions.
-  * @param aggInFields The positions of the aggregation input fields.
+  * @param genAggregations Code-generated [[GeneratedAggregations]]
   * @param gkeyOutMapping The mapping of group keys between input and 
output positions.
-  * @param aggOutMapping  The mapping of aggregates to output positions.
   * @param groupingSetsMapping The mapping of grouping set keys between 
input and output positions.
-  * @param finalRowArity The arity of the final resulting row.
   */
 class DataSetAggFunction(
-private val aggregates: Array[AggregateFunction[_ <: Any]],
-private val aggInFields: Array[Array[Int]],
-private val aggOutMapping: Array[(Int, Int)],
+private val genAggregations: GeneratedAggregationsFunction,
 private val gkeyOutMapping: Array[(Int, Int)],
--- End diff --

It would be good if we could parameterize the method that generates the 
code such that we can do the grouping keys and grouping set copies with 
`GeneratedAggregations.setForwardFields()`. This should be possible as it is 
actually just setting constant boolean flags at certain positions in the output 
Row.


> codeGen DataSet Goupingwindow Aggregates
> 
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973420#comment-15973420
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r111995303
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala
 ---
@@ -19,88 +19,71 @@
 package org.apache.flink.table.runtime.aggregate
 
 import java.lang.Iterable
-import java.util.{ArrayList => JArrayList}
 
 import org.apache.flink.api.common.functions.RichGroupReduceFunction
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.table.codegen.{Compiler, 
GeneratedAggregationsFunction}
 import org.apache.flink.types.Row
 import org.apache.flink.util.{Collector, Preconditions}
+import org.slf4j.LoggerFactory
 
 /**
   * [[RichGroupReduceFunction]] to compute the final result of a 
pre-aggregated aggregation
   * for batch (DataSet) queries.
   *
-  * @param aggregates The aggregate functions.
-  * @param aggOutFields The positions of the aggregation results in the 
output
+  * @param genAggregations Code-generated [[GeneratedAggregations]]
   * @param gkeyOutFields The positions of the grouping keys in the output
   * @param groupingSetsMapping The mapping of grouping set keys between 
input and output positions.
-  * @param finalRowArity The arity of the final resulting row
   */
 class DataSetFinalAggFunction(
-private val aggregates: Array[AggregateFunction[_ <: Any]],
-private val aggOutFields: Array[Int],
+private val genAggregations: GeneratedAggregationsFunction,
 private val gkeyOutFields: Array[Int],
-private val groupingSetsMapping: Array[(Int, Int)],
-private val finalRowArity: Int)
-  extends RichGroupReduceFunction[Row, Row] {
+private val groupingSetsMapping: Array[(Int, Int)])
+  extends RichGroupReduceFunction[Row, Row]
+with Compiler[GeneratedAggregations] {
 
-  Preconditions.checkNotNull(aggregates)
-  Preconditions.checkNotNull(aggOutFields)
   Preconditions.checkNotNull(gkeyOutFields)
   Preconditions.checkNotNull(groupingSetsMapping)
 
   private var output: Row = _
+  private var accumulators: Row = _
+
+  val LOG = LoggerFactory.getLogger(this.getClass)
+  private var function: GeneratedAggregations = _
 
   private val intermediateGKeys: Option[Array[Int]] = if 
(!groupingSetsMapping.isEmpty) {
 Some(gkeyOutFields)
   } else {
 None
   }
 
-  private val numAggs = aggregates.length
-  private val numGKeys = gkeyOutFields.length
-
-  private val accumulators: Array[JArrayList[Accumulator]] =
-Array.fill(numAggs)(new JArrayList[Accumulator](2))
-
   override def open(config: Configuration) {
-output = new Row(finalRowArity)
-
-// init lists with two empty accumulators
-for (i <- aggregates.indices) {
-  val accumulator = aggregates(i).createAccumulator()
-  accumulators(i).add(accumulator)
-  accumulators(i).add(accumulator)
-}
+LOG.debug(s"Compiling AggregateHelper: $genAggregations.name \n\n " +
+s"Code:\n$genAggregations.code")
+val clazz = compile(
+  getClass.getClassLoader,
+  genAggregations.name,
+  genAggregations.code)
+LOG.debug("Instantiating AggregateHelper.")
+function = clazz.newInstance()
+
+output = function.createOutputRow()
+accumulators = function.createAccumulators()
   }
 
   override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = 
{
 
 val iterator = records.iterator()
 
 // reset first accumulator
-var i = 0
-while (i < aggregates.length) {
-  aggregates(i).resetAccumulator(accumulators(i).get(0))
-  i += 1
-}
+function.resetAccumulator(accumulators)
 
+var i = 0
 while (iterator.hasNext) {
   val record = iterator.next()
--- End diff --

we can make `record` a `var` and move its definition outside of the loop.
Then we can get rid of the `if (!iterator.hasNext)` check in the body of 
the while loop and set the `output` fields after the loop has terminated. 


> codeGen DataSet Goupingwindow Aggregates
> 
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>

[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973413#comment-15973413
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r111990633
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -263,33 +263,56 @@ class CodeGenerator(
  aggFields: Array[Array[Int]],
  aggMapping: Array[Int],
  fwdMapping: Array[(Int, Int)],
- outputArity: Int)
+ outputArity: Int,
+ groupingKeys: Array[Int])
   : GeneratedAggregationsFunction = {
 
 def genSetAggregationResults(
   accTypes: Array[String],
   aggs: Array[String],
   aggMapping: Array[Int]): String = {
 
-  val sig: String =
+  val sigHelper: String =
 j"""
-|  public void setAggregationResults(
-|org.apache.flink.types.Row accs,
-|org.apache.flink.types.Row output)""".stripMargin
+   |  private final void setAggregationResultsHelper(
+   |org.apache.flink.types.Row accs,
+   |org.apache.flink.types.Row output,
+   |java.lang.Integer offset)""".stripMargin
 
-  val setAggs: String = {
+  val setAggsHelper: String = {
 for (i <- aggs.indices) yield
   j"""
  |org.apache.flink.table.functions.AggregateFunction 
baseClass$i =
  |  (org.apache.flink.table.functions.AggregateFunction) 
${aggs(i)};
  |
  |output.setField(
- |  ${aggMapping(i)},
+ |  ${aggMapping(i)} + offset,
--- End diff --

`${aggMapping(i)} + offset` -> `${aggMapping(i) + offset}` to add the 
constant `offset` to the mapping before generating the code.


> codeGen DataSet Goupingwindow Aggregates
> 
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973411#comment-15973411
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r111991399
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -407,32 +513,56 @@ class CodeGenerator(
 accTypes: Array[String],
 aggs: Array[String]): String = {
 
-  val sig: String =
+  val sigHelper: String =
 j"""
-   |  public org.apache.flink.types.Row mergeAccumulatorsPair(
+   |  public final org.apache.flink.types.Row 
mergeAccumulatorsPairHelper(
|org.apache.flink.types.Row a,
-   |org.apache.flink.types.Row b)
+   |org.apache.flink.types.Row b,
+   |java.lang.Integer offset)
""".stripMargin
-  val merge: String = {
+  val mergeHelper: String = {
 for (i <- aggs.indices) yield
   j"""
  |${accTypes(i)} aAcc$i = (${accTypes(i)}) a.getField($i);
- |${accTypes(i)} bAcc$i = (${accTypes(i)}) b.getField($i);
+ |${accTypes(i)} bAcc$i = (${accTypes(i)}) b.getField($i + 
offset);
--- End diff --

`b.getField($i + offset)` -> `b.getField(${i + offset})`


> codeGen DataSet Goupingwindow Aggregates
> 
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973419#comment-15973419
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r111994200
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala
 ---
@@ -64,38 +68,22 @@ class DataSetPreAggFunction(
   def preaggregate(records: Iterable[Row], out: Collector[Row]): Unit = {
--- End diff --

I think we can move the implementation of `preaggregate()` to `combine()` 
and let `mapPartition()` call `combine()`. `combine()` is called for groups for 
records with the key and `mapPartition()` just once (for the whole partition). 
This way we can remove some overhead from `combine()`.


> codeGen DataSet Goupingwindow Aggregates
> 
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972920#comment-15972920
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user shaoxuan-wang commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r111986331
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
 ---
@@ -36,6 +36,15 @@ abstract class GeneratedAggregations extends Function {
   def setAggregationResults(accumulators: Row, output: Row)
 
   /**
+* Calculates the results from accumulators, and set the results to the 
output (with key offset)
+*
+* @param accumulators the accumulators (saved in a row) which contains 
the current
+* aggregated results
+* @param output   output results collected in a row
+*/
+  def setAggregationResultsWithKeyOffset(accumulators: Row, output: Row)
--- End diff --

This sounds a very good idea. I actually have thought to merge 
*WithKeyOffset functions into the existing functions. It works for most 
functions, but `setAggregationResults` and `setAggregationResults` are a little 
tricky. For `accumulate` and `setAggregateResults`, they do not need keyOffset, 
but for `merge`, they need.


> codeGen DataSet Goupingwindow Aggregates
> 
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972908#comment-15972908
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r111984555
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala
 ---
@@ -97,12 +94,7 @@ class DataSetAggFunction(
 }
 
 // set agg results to output
-i = 0
-while (i < aggOutMapping.length) {
-  val (out, in) = aggOutMapping(i)
-  output.setField(out, aggregates(in).getValue(accumulators(in)))
-  i += 1
-}
+function.setAggregationResults(accumulators, output)
 
 // set grouping set flags to output
 if (intermediateGKeys.isDefined) {
--- End diff --

I think this should eventually be integrated with `setForwardFields()` as 
well. 
For now, we might leave it as it is.


> codeGen DataSet Goupingwindow Aggregates
> 
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972905#comment-15972905
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r111983678
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala
 ---
@@ -68,23 +72,16 @@ class DataSetAggFunction(
   override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = 
{
 
 // create accumulators
-var i = 0
-while (i < aggregates.length) {
-  accumulators(i) = aggregates(i).createAccumulator()
-  i += 1
-}
+accumulators = function.createAccumulators()
 
 val iterator = records.iterator()
 
 while (iterator.hasNext) {
   val record = iterator.next()
+  var i = 0
 
   // accumulate
-  i = 0
-  while (i < aggregates.length) {
-aggregates(i).accumulate(accumulators(i), 
record.getField(aggInFields(i)(0)))
-i += 1
-  }
+  function.accumulate(accumulators, record)
 
   // check if this record is the last record
   if (!iterator.hasNext) {
--- End diff --

Couldn't we use `function.setForwardFields()` to forward the grouping keys 
to the output?


> codeGen DataSet Goupingwindow Aggregates
> 
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972901#comment-15972901
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r111983391
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala
 ---
@@ -68,23 +72,16 @@ class DataSetAggFunction(
   override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = 
{
 
 // create accumulators
-var i = 0
-while (i < aggregates.length) {
-  accumulators(i) = aggregates(i).createAccumulator()
-  i += 1
-}
+accumulators = function.createAccumulators()
--- End diff --

We could create the accumulators once and use 
`function.resetAccumulators()` to reset and reuse the object.


> codeGen DataSet Goupingwindow Aggregates
> 
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972896#comment-15972896
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r111982297
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
 ---
@@ -36,6 +36,15 @@ abstract class GeneratedAggregations extends Function {
   def setAggregationResults(accumulators: Row, output: Row)
 
   /**
+* Calculates the results from accumulators, and set the results to the 
output (with key offset)
+*
+* @param accumulators the accumulators (saved in a row) which contains 
the current
+* aggregated results
+* @param output   output results collected in a row
+*/
+  def setAggregationResultsWithKeyOffset(accumulators: Row, output: Row)
--- End diff --

We could reuse all your code, but just put it into a different method of 
the `CodeGenerator` and make it implement the existing methods. Their 
interfaces are the same.


> codeGen DataSet Goupingwindow Aggregates
> 
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972869#comment-15972869
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3735#discussion_r111978489
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
 ---
@@ -36,6 +36,15 @@ abstract class GeneratedAggregations extends Function {
   def setAggregationResults(accumulators: Row, output: Row)
 
   /**
+* Calculates the results from accumulators, and set the results to the 
output (with key offset)
+*
+* @param accumulators the accumulators (saved in a row) which contains 
the current
+* aggregated results
+* @param output   output results collected in a row
+*/
+  def setAggregationResultsWithKeyOffset(accumulators: Row, output: Row)
--- End diff --

I don't think we need to extend the `GeneratedAggregations` interface 
(except for `resetAccumulators()`)
I would rather implement another code generation function that implements 
the existing methods differently. This would mean to add another method to 
`CodeGenerator` that generates the `GeneratedAggregations` interface suitable 
for the DataSet aggregations.

- `setAggregationResultsWithKeyOffset` -> `setAggregationResults`
- `setKeyToOutput` -> `setForwardedFields`
- `accumulateWithKeyOffset` -> `accumulate`
- `createAccumulatorsAndSetToOutput` could be replaced by 
`createAccumulators` (called once to create a reusable accumulators), 
`resetAccumulators`, and `setAggregationResults` (if it sets the accumulators 
instead of calling `AggFunction.getValue()`, see below)
- `copyAccumulatorsToBuffer` -> `setAggregationResults` (the accumulators 
are partial aggregation results). This would mean we have two behaviors, 
setting the final (`getValue()`) or the partial result (accumulator) for 
`setAggregateResults()`. A simple flag during code gen would go for either the 
final or the partial result.


> codeGen DataSet Goupingwindow Aggregates
> 
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6242) codeGen DataSet Goupingwindow Aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972698#comment-15972698
 ] 

ASF GitHub Bot commented on FLINK-6242:
---

GitHub user shaoxuan-wang opened a pull request:

https://github.com/apache/flink/pull/3735

[FLINK-6242] [table] Add code generation for DataSet Aggregates

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/shaoxuan-wang/flink F6242-submit

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3735.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3735


commit 16e8aa7400f1b9a9f490522427f269fd01a0f640
Author: shaoxuan-wang 
Date:   2017-04-18T13:45:49Z

[FLINK-6242] [table] Add code generation for DataSet Aggregates




> codeGen DataSet Goupingwindow Aggregates
> 
>
> Key: FLINK-6242
> URL: https://issues.apache.org/jira/browse/FLINK-6242
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Shaoxuan Wang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)