[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2792


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89249046
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -115,14 +124,210 @@ object AggregateUtil {
   intermediateRowArity,
   outputType.getFieldCount)
   }
+groupReduceFunction
+  }
+
+  /**
+* Create IncrementalAggregateReduceFunction for Incremental 
aggregates. It implement
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+*
+*/
+  private[flink] def createIncrementalAggregateReduceFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+val groupingOffsetMapping =
+  getGroupingOffsetAndaggOffsetMapping(
+namedAggregates,
+inputType,
+outputType,
+groupings)._1
+val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+val reduceFunction = new IncrementalAggregateReduceFunction(
+  aggregates,
+  groupingOffsetMapping,
+  intermediateRowArity)
+reduceFunction
+  }
+
+  /**
+* @return groupingOffsetMapping (mapping relation between field index 
of intermediate
+* aggregate Row and output Row.)
+* and aggOffsetMapping (the mapping relation between aggregate 
function index in list
+* and its corresponding field index in output Row.)
+*/
+  def getGroupingOffsetAndaggOffsetMapping(
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = {
+
+// the mapping relation between field index of intermediate aggregate 
Row and output Row.
+val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, 
groupings)
+
+// the mapping relation between aggregate function index in list and 
its corresponding
+// field index in output Row.
+val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
 
-(mapFunction, reduceGroupFunction)
+if (groupingOffsetMapping.length != groupings.length ||
+  aggOffsetMapping.length != namedAggregates.length) {
+  throw new TableException(
+"Could not find output field in input data type " +
+  "or aggregate functions.")
+}
+(groupingOffsetMapping, aggOffsetMapping)
+  }
+
+
+  private[flink] def createAllWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
--- End diff --

yes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89247407
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -115,14 +124,210 @@ object AggregateUtil {
   intermediateRowArity,
   outputType.getFieldCount)
   }
+groupReduceFunction
+  }
+
+  /**
+* Create IncrementalAggregateReduceFunction for Incremental 
aggregates. It implement
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+*
+*/
+  private[flink] def createIncrementalAggregateReduceFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+val groupingOffsetMapping =
+  getGroupingOffsetAndaggOffsetMapping(
+namedAggregates,
+inputType,
+outputType,
+groupings)._1
+val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+val reduceFunction = new IncrementalAggregateReduceFunction(
+  aggregates,
+  groupingOffsetMapping,
+  intermediateRowArity)
+reduceFunction
+  }
+
+  /**
+* @return groupingOffsetMapping (mapping relation between field index 
of intermediate
+* aggregate Row and output Row.)
+* and aggOffsetMapping (the mapping relation between aggregate 
function index in list
+* and its corresponding field index in output Row.)
+*/
+  def getGroupingOffsetAndaggOffsetMapping(
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = {
+
+// the mapping relation between field index of intermediate aggregate 
Row and output Row.
+val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, 
groupings)
+
+// the mapping relation between aggregate function index in list and 
its corresponding
+// field index in output Row.
+val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
 
-(mapFunction, reduceGroupFunction)
+if (groupingOffsetMapping.length != groupings.length ||
+  aggOffsetMapping.length != namedAggregates.length) {
+  throw new TableException(
+"Could not find output field in input data type " +
+  "or aggregate functions.")
+}
+(groupingOffsetMapping, aggOffsetMapping)
+  }
+
+
+  private[flink] def createAllWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
+  : AllWindowFunction[Row, Row, DataStreamWindow] = {
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+  new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos)
+  .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
+} else {
+  new AggregateAllWindowFunction(aggFunction)
+}
+
+  }
+
+
+  private[flink] def createWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
+  : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+  new AggregateTimeWindowFunction(aggFunction, startPos, endPos)
+  .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
+} else {
+  new AggregateWindowFunction(aggFunction)
+}
+
+  }
+
+  private[flink] def createAllWindowIncrementalAggregationFunction(
+window: LogicalWindow,
+aggregates: Array[Aggregate[_ <: Any]],
--- End diff --

sure. agree.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89247416
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -115,14 +124,210 @@ object AggregateUtil {
   intermediateRowArity,
   outputType.getFieldCount)
   }
+groupReduceFunction
+  }
+
+  /**
+* Create IncrementalAggregateReduceFunction for Incremental 
aggregates. It implement
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+*
+*/
+  private[flink] def createIncrementalAggregateReduceFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+val groupingOffsetMapping =
+  getGroupingOffsetAndaggOffsetMapping(
+namedAggregates,
+inputType,
+outputType,
+groupings)._1
+val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+val reduceFunction = new IncrementalAggregateReduceFunction(
+  aggregates,
+  groupingOffsetMapping,
+  intermediateRowArity)
+reduceFunction
+  }
+
+  /**
+* @return groupingOffsetMapping (mapping relation between field index 
of intermediate
+* aggregate Row and output Row.)
+* and aggOffsetMapping (the mapping relation between aggregate 
function index in list
+* and its corresponding field index in output Row.)
+*/
+  def getGroupingOffsetAndaggOffsetMapping(
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = {
+
+// the mapping relation between field index of intermediate aggregate 
Row and output Row.
+val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, 
groupings)
+
+// the mapping relation between aggregate function index in list and 
its corresponding
+// field index in output Row.
+val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
 
-(mapFunction, reduceGroupFunction)
+if (groupingOffsetMapping.length != groupings.length ||
+  aggOffsetMapping.length != namedAggregates.length) {
+  throw new TableException(
+"Could not find output field in input data type " +
+  "or aggregate functions.")
+}
+(groupingOffsetMapping, aggOffsetMapping)
+  }
+
+
+  private[flink] def createAllWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
+  : AllWindowFunction[Row, Row, DataStreamWindow] = {
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+  new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos)
+  .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
+} else {
+  new AggregateAllWindowFunction(aggFunction)
+}
+
+  }
+
+
+  private[flink] def createWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
+  : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+  new AggregateTimeWindowFunction(aggFunction, startPos, endPos)
+  .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
+} else {
+  new AggregateWindowFunction(aggFunction)
+}
+
+  }
+
+  private[flink] def createAllWindowIncrementalAggregationFunction(
+window: LogicalWindow,
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int],
+properties: Seq[NamedWindowProperty])
+  : AllWindowFunction[Row, Row, DataStreamWindow] = {
+
+val (groupingOffsetMapping, aggOffsetMapping) =
+  getGroupingOffsetAndaggOffsetMapping(
+  namedAggregates,
+  inputType,
+  outputType,
+  groupings)
+
+val finalRowArity = outputType.getFieldCount
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+ 

[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89247362
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -39,66 +45,69 @@ object AggregateUtil {
   type JavaList[T] = java.util.List[T]
 
   /**
-   * Create Flink operator functions for aggregates. It includes 2 
implementations of Flink 
-   * operator functions:
-   * [[org.apache.flink.api.common.functions.MapFunction]] and 
-   * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's 
partial aggregate,
-   * should also implement 
[[org.apache.flink.api.common.functions.CombineFunction]] as well). 
-   * The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the 
-   * intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
-   * format:
-   *
-   * {{{
-   *   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
-   * |  |
-   * v  v
-   *+-+-+++++
-   *|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
-   *+-+-+++++
-   *  ^
-   *  |
-   *   sum(y) aggOffsetInRow = 4
-   * }}}
-   *
-   */
-  def createOperatorFunctionsForAggregates(
-  namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-  inputType: RelDataType,
-  outputType: RelDataType,
-  groupings: Array[Int])
-: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = {
-
-val aggregateFunctionsAndFieldIndexes =
-  transformToAggregateFunctions(namedAggregates.map(_.getKey), 
inputType, groupings.length)
-// store the aggregate fields of each aggregate function, by the same 
order of aggregates.
-val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1
-val aggregates = aggregateFunctionsAndFieldIndexes._2
+* Create prepare MapFunction for aggregates.
+* The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the
+* intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
+* format:
+*
+* {{{
+*   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
+* |  |
+* v  v
+*+-+-+++++
+*|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
+*+-+-+++++
+*  ^
+*  |
+*   sum(y) aggOffsetInRow = 4
+* }}}
+*
+*/
+  private[flink] def createPrepareMapFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+aggFieldIndexes: Array[Int],
+groupings: Array[Int],
+inputType:
+RelDataType): MapFunction[Any, Row] = {
 
 val mapReturnType: RowTypeInfo =
   createAggregateBufferDataType(groupings, aggregates, inputType)
 
 val mapFunction = new AggregateMapFunction[Row, Row](
-aggregates, aggFieldIndexes, groupings,
-
mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]]
-
-// the mapping relation between field index of intermediate aggregate 
Row and output Row.
-val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, 
groupings)
-
-// the mapping relation between aggregate function index in list and 
its corresponding
-// field index in output Row.
-val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
-
-if (groupingOffsetMapping.length != groupings.length ||
-aggOffsetMapping.length != namedAggregates.length) {
-  throw new TableException("Could not find output field in input data 
type " +
-  "or aggregate functions.")
-}
-
-val allPartialAggregate = aggregates.map(_.supportPartial).forall(x => 
x)
+  aggregates,
+  aggFieldIndexes,
+  groupings,
+  
mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]]
 
-val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+

[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89247312
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -39,66 +45,69 @@ object AggregateUtil {
   type JavaList[T] = java.util.List[T]
 
   /**
-   * Create Flink operator functions for aggregates. It includes 2 
implementations of Flink 
-   * operator functions:
-   * [[org.apache.flink.api.common.functions.MapFunction]] and 
-   * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's 
partial aggregate,
-   * should also implement 
[[org.apache.flink.api.common.functions.CombineFunction]] as well). 
-   * The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the 
-   * intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
-   * format:
-   *
-   * {{{
-   *   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
-   * |  |
-   * v  v
-   *+-+-+++++
-   *|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
-   *+-+-+++++
-   *  ^
-   *  |
-   *   sum(y) aggOffsetInRow = 4
-   * }}}
-   *
-   */
-  def createOperatorFunctionsForAggregates(
-  namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-  inputType: RelDataType,
-  outputType: RelDataType,
-  groupings: Array[Int])
-: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = {
-
-val aggregateFunctionsAndFieldIndexes =
-  transformToAggregateFunctions(namedAggregates.map(_.getKey), 
inputType, groupings.length)
-// store the aggregate fields of each aggregate function, by the same 
order of aggregates.
-val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1
-val aggregates = aggregateFunctionsAndFieldIndexes._2
+* Create prepare MapFunction for aggregates.
+* The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the
+* intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
+* format:
+*
+* {{{
+*   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
+* |  |
+* v  v
+*+-+-+++++
+*|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
+*+-+-+++++
+*  ^
+*  |
+*   sum(y) aggOffsetInRow = 4
+* }}}
+*
+*/
+  private[flink] def createPrepareMapFunction(
+aggregates: Array[Aggregate[_ <: Any]],
--- End diff --

Cool,I think  use Calcite's namedAggregates is a good idea.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89244761
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -115,14 +124,210 @@ object AggregateUtil {
   intermediateRowArity,
   outputType.getFieldCount)
   }
+groupReduceFunction
+  }
+
+  /**
+* Create IncrementalAggregateReduceFunction for Incremental 
aggregates. It implement
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+*
+*/
+  private[flink] def createIncrementalAggregateReduceFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+val groupingOffsetMapping =
+  getGroupingOffsetAndaggOffsetMapping(
+namedAggregates,
+inputType,
+outputType,
+groupings)._1
+val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+val reduceFunction = new IncrementalAggregateReduceFunction(
+  aggregates,
+  groupingOffsetMapping,
+  intermediateRowArity)
+reduceFunction
+  }
+
+  /**
+* @return groupingOffsetMapping (mapping relation between field index 
of intermediate
+* aggregate Row and output Row.)
+* and aggOffsetMapping (the mapping relation between aggregate 
function index in list
+* and its corresponding field index in output Row.)
+*/
+  def getGroupingOffsetAndaggOffsetMapping(
--- End diff --

yes.i'd do it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89242998
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -61,25 +61,108 @@ object AggregateUtil {
* }}}
*
*/
-  def createOperatorFunctionsForAggregates(
+def createOperatorFunctionsForAggregates(
   namedAggregates: Seq[CalcitePair[AggregateCall, String]],
   inputType: RelDataType,
   outputType: RelDataType,
   groupings: Array[Int])
 : (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = {
 
-val aggregateFunctionsAndFieldIndexes =
-  transformToAggregateFunctions(namedAggregates.map(_.getKey), 
inputType, groupings.length)
-// store the aggregate fields of each aggregate function, by the same 
order of aggregates.
-val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1
-val aggregates = aggregateFunctionsAndFieldIndexes._2
+   val (aggFieldIndexes, aggregates)  =
+   transformToAggregateFunctions(namedAggregates.map(_.getKey),
+ inputType, groupings.length)
 
-val mapReturnType: RowTypeInfo =
-  createAggregateBufferDataType(groupings, aggregates, inputType)
+createOperatorFunctionsForAggregates(namedAggregates,
+  inputType,
+  outputType,
+  groupings,
+  aggregates,aggFieldIndexes)
+}
 
-val mapFunction = new AggregateMapFunction[Row, Row](
-aggregates, aggFieldIndexes, groupings,
-
mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]]
+def createOperatorFunctionsForAggregates(
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int],
+aggregates:Array[Aggregate[_ <: Any]],
+aggFieldIndexes:Array[Int])
+: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row])= {
+
+  val mapFunction = createAggregateMapFunction(aggregates,
+aggFieldIndexes, groupings, inputType)
+
+  // the mapping relation between field index of intermediate 
aggregate Row and output Row.
+  val groupingOffsetMapping = getGroupKeysMapping(inputType, 
outputType, groupings)
+
+  // the mapping relation between aggregate function index in list and 
its corresponding
+  // field index in output Row.
+  val aggOffsetMapping = getAggregateMapping(namedAggregates, 
outputType)
+
+  if (groupingOffsetMapping.length != groupings.length ||
+aggOffsetMapping.length != namedAggregates.length) {
+throw new TableException("Could not find output field in input 
data type " +
+  "or aggregate functions.")
+  }
+
+  val allPartialAggregate = aggregates.map(_.supportPartial).forall(x 
=> x)
+
+  val intermediateRowArity = groupings.length +
+aggregates.map(_.intermediateDataType.length).sum
+
+  val reduceGroupFunction =
+if (allPartialAggregate) {
+  new AggregateReduceCombineFunction(
+aggregates,
+groupingOffsetMapping,
+aggOffsetMapping,
+intermediateRowArity,
+outputType.getFieldCount)
+}
+else {
+  new AggregateReduceGroupFunction(
+aggregates,
+groupingOffsetMapping,
+aggOffsetMapping,
+intermediateRowArity,
+outputType.getFieldCount)
+}
+
+  (mapFunction, reduceGroupFunction)
+  }
+
+  /**
+* Create Flink operator functions for Incremental aggregates.
+* It includes 2 implementations of Flink operator functions:
+* [[org.apache.flink.api.common.functions.MapFunction]] and
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+* The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the
+* intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
+* format:
+*
+* {{{
+*   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
+* |  |
+* v  v
+*+-+-+++++
+*|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
+*+-+-+++++
+*

[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89101976
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -115,14 +124,210 @@ object AggregateUtil {
   intermediateRowArity,
   outputType.getFieldCount)
   }
+groupReduceFunction
+  }
+
+  /**
+* Create IncrementalAggregateReduceFunction for Incremental 
aggregates. It implement
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+*
+*/
+  private[flink] def createIncrementalAggregateReduceFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+val groupingOffsetMapping =
+  getGroupingOffsetAndaggOffsetMapping(
+namedAggregates,
+inputType,
+outputType,
+groupings)._1
+val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+val reduceFunction = new IncrementalAggregateReduceFunction(
+  aggregates,
+  groupingOffsetMapping,
+  intermediateRowArity)
+reduceFunction
+  }
+
+  /**
+* @return groupingOffsetMapping (mapping relation between field index 
of intermediate
+* aggregate Row and output Row.)
+* and aggOffsetMapping (the mapping relation between aggregate 
function index in list
+* and its corresponding field index in output Row.)
+*/
+  def getGroupingOffsetAndaggOffsetMapping(
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = {
+
+// the mapping relation between field index of intermediate aggregate 
Row and output Row.
+val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, 
groupings)
+
+// the mapping relation between aggregate function index in list and 
its corresponding
+// field index in output Row.
+val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
 
-(mapFunction, reduceGroupFunction)
+if (groupingOffsetMapping.length != groupings.length ||
+  aggOffsetMapping.length != namedAggregates.length) {
+  throw new TableException(
+"Could not find output field in input data type " +
+  "or aggregate functions.")
+}
+(groupingOffsetMapping, aggOffsetMapping)
+  }
+
+
+  private[flink] def createAllWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
+  : AllWindowFunction[Row, Row, DataStreamWindow] = {
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+  new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos)
+  .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
+} else {
+  new AggregateAllWindowFunction(aggFunction)
+}
+
+  }
+
+
+  private[flink] def createWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
+  : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+  new AggregateTimeWindowFunction(aggFunction, startPos, endPos)
+  .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
+} else {
+  new AggregateWindowFunction(aggFunction)
+}
+
+  }
+
+  private[flink] def createAllWindowIncrementalAggregationFunction(
+window: LogicalWindow,
+aggregates: Array[Aggregate[_ <: Any]],
--- End diff --

compute `aggregates` from `namedAggregates`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89089095
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -115,14 +124,210 @@ object AggregateUtil {
   intermediateRowArity,
   outputType.getFieldCount)
   }
+groupReduceFunction
+  }
+
+  /**
+* Create IncrementalAggregateReduceFunction for Incremental 
aggregates. It implement
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+*
+*/
+  private[flink] def createIncrementalAggregateReduceFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+val groupingOffsetMapping =
+  getGroupingOffsetAndaggOffsetMapping(
+namedAggregates,
+inputType,
+outputType,
+groupings)._1
+val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+val reduceFunction = new IncrementalAggregateReduceFunction(
+  aggregates,
+  groupingOffsetMapping,
+  intermediateRowArity)
+reduceFunction
+  }
+
+  /**
+* @return groupingOffsetMapping (mapping relation between field index 
of intermediate
+* aggregate Row and output Row.)
+* and aggOffsetMapping (the mapping relation between aggregate 
function index in list
+* and its corresponding field index in output Row.)
+*/
+  def getGroupingOffsetAndaggOffsetMapping(
--- End diff --

Please fix camel case: `getGroupingOffsetAndAggOffsetMapping`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89088322
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -39,66 +45,69 @@ object AggregateUtil {
   type JavaList[T] = java.util.List[T]
 
   /**
-   * Create Flink operator functions for aggregates. It includes 2 
implementations of Flink 
-   * operator functions:
-   * [[org.apache.flink.api.common.functions.MapFunction]] and 
-   * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's 
partial aggregate,
-   * should also implement 
[[org.apache.flink.api.common.functions.CombineFunction]] as well). 
-   * The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the 
-   * intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
-   * format:
-   *
-   * {{{
-   *   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
-   * |  |
-   * v  v
-   *+-+-+++++
-   *|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
-   *+-+-+++++
-   *  ^
-   *  |
-   *   sum(y) aggOffsetInRow = 4
-   * }}}
-   *
-   */
-  def createOperatorFunctionsForAggregates(
-  namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-  inputType: RelDataType,
-  outputType: RelDataType,
-  groupings: Array[Int])
-: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = {
-
-val aggregateFunctionsAndFieldIndexes =
-  transformToAggregateFunctions(namedAggregates.map(_.getKey), 
inputType, groupings.length)
-// store the aggregate fields of each aggregate function, by the same 
order of aggregates.
-val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1
-val aggregates = aggregateFunctionsAndFieldIndexes._2
+* Create prepare MapFunction for aggregates.
+* The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the
+* intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
+* format:
+*
+* {{{
+*   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
+* |  |
+* v  v
+*+-+-+++++
+*|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
+*+-+-+++++
+*  ^
+*  |
+*   sum(y) aggOffsetInRow = 4
+* }}}
+*
+*/
+  private[flink] def createPrepareMapFunction(
+aggregates: Array[Aggregate[_ <: Any]],
--- End diff --

Let's just use `namedAggregates: Seq[CalcitePair[AggregateCall, String]]` 
and compute `aggregates` and `aggFieldIndexes` from it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89090109
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -115,14 +124,210 @@ object AggregateUtil {
   intermediateRowArity,
   outputType.getFieldCount)
   }
+groupReduceFunction
+  }
+
+  /**
+* Create IncrementalAggregateReduceFunction for Incremental 
aggregates. It implement
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+*
+*/
+  private[flink] def createIncrementalAggregateReduceFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+val groupingOffsetMapping =
+  getGroupingOffsetAndaggOffsetMapping(
+namedAggregates,
+inputType,
+outputType,
+groupings)._1
+val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+val reduceFunction = new IncrementalAggregateReduceFunction(
+  aggregates,
+  groupingOffsetMapping,
+  intermediateRowArity)
+reduceFunction
+  }
+
+  /**
+* @return groupingOffsetMapping (mapping relation between field index 
of intermediate
+* aggregate Row and output Row.)
+* and aggOffsetMapping (the mapping relation between aggregate 
function index in list
+* and its corresponding field index in output Row.)
+*/
+  def getGroupingOffsetAndaggOffsetMapping(
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = {
+
+// the mapping relation between field index of intermediate aggregate 
Row and output Row.
+val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, 
groupings)
+
+// the mapping relation between aggregate function index in list and 
its corresponding
+// field index in output Row.
+val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
 
-(mapFunction, reduceGroupFunction)
+if (groupingOffsetMapping.length != groupings.length ||
+  aggOffsetMapping.length != namedAggregates.length) {
+  throw new TableException(
+"Could not find output field in input data type " +
+  "or aggregate functions.")
+}
+(groupingOffsetMapping, aggOffsetMapping)
+  }
+
+
+  private[flink] def createAllWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
+  : AllWindowFunction[Row, Row, DataStreamWindow] = {
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+  new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos)
+  .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
+} else {
+  new AggregateAllWindowFunction(aggFunction)
+}
+
+  }
+
+
+  private[flink] def createWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
+  : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+  new AggregateTimeWindowFunction(aggFunction, startPos, endPos)
+  .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
+} else {
+  new AggregateWindowFunction(aggFunction)
+}
+
+  }
+
+  private[flink] def createAllWindowIncrementalAggregationFunction(
+window: LogicalWindow,
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int],
+properties: Seq[NamedWindowProperty])
+  : AllWindowFunction[Row, Row, DataStreamWindow] = {
+
+val (groupingOffsetMapping, aggOffsetMapping) =
+  getGroupingOffsetAndaggOffsetMapping(
+  namedAggregates,
+  inputType,
+  outputType,
+  groupings)
+
+val finalRowArity = outputType.getFieldCount
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+  new 

[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89088819
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -39,66 +45,69 @@ object AggregateUtil {
   type JavaList[T] = java.util.List[T]
 
   /**
-   * Create Flink operator functions for aggregates. It includes 2 
implementations of Flink 
-   * operator functions:
-   * [[org.apache.flink.api.common.functions.MapFunction]] and 
-   * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's 
partial aggregate,
-   * should also implement 
[[org.apache.flink.api.common.functions.CombineFunction]] as well). 
-   * The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the 
-   * intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
-   * format:
-   *
-   * {{{
-   *   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
-   * |  |
-   * v  v
-   *+-+-+++++
-   *|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
-   *+-+-+++++
-   *  ^
-   *  |
-   *   sum(y) aggOffsetInRow = 4
-   * }}}
-   *
-   */
-  def createOperatorFunctionsForAggregates(
-  namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-  inputType: RelDataType,
-  outputType: RelDataType,
-  groupings: Array[Int])
-: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = {
-
-val aggregateFunctionsAndFieldIndexes =
-  transformToAggregateFunctions(namedAggregates.map(_.getKey), 
inputType, groupings.length)
-// store the aggregate fields of each aggregate function, by the same 
order of aggregates.
-val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1
-val aggregates = aggregateFunctionsAndFieldIndexes._2
+* Create prepare MapFunction for aggregates.
+* The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the
+* intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
+* format:
+*
+* {{{
+*   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
+* |  |
+* v  v
+*+-+-+++++
+*|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
+*+-+-+++++
+*  ^
+*  |
+*   sum(y) aggOffsetInRow = 4
+* }}}
+*
+*/
+  private[flink] def createPrepareMapFunction(
+aggregates: Array[Aggregate[_ <: Any]],
--- End diff --

I think we should make the arguments of the `create*Function` methods more 
consistent. Either we use Calcite's `namedAggregates` or we use the transformed 
`aggregates` and `aggFieldIndexes`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89086247
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -39,66 +45,69 @@ object AggregateUtil {
   type JavaList[T] = java.util.List[T]
 
   /**
-   * Create Flink operator functions for aggregates. It includes 2 
implementations of Flink 
-   * operator functions:
-   * [[org.apache.flink.api.common.functions.MapFunction]] and 
-   * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's 
partial aggregate,
-   * should also implement 
[[org.apache.flink.api.common.functions.CombineFunction]] as well). 
-   * The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the 
-   * intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
-   * format:
-   *
-   * {{{
-   *   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
-   * |  |
-   * v  v
-   *+-+-+++++
-   *|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
-   *+-+-+++++
-   *  ^
-   *  |
-   *   sum(y) aggOffsetInRow = 4
-   * }}}
-   *
-   */
-  def createOperatorFunctionsForAggregates(
-  namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-  inputType: RelDataType,
-  outputType: RelDataType,
-  groupings: Array[Int])
-: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = {
-
-val aggregateFunctionsAndFieldIndexes =
-  transformToAggregateFunctions(namedAggregates.map(_.getKey), 
inputType, groupings.length)
-// store the aggregate fields of each aggregate function, by the same 
order of aggregates.
-val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1
-val aggregates = aggregateFunctionsAndFieldIndexes._2
+* Create prepare MapFunction for aggregates.
+* The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the
+* intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
+* format:
+*
+* {{{
+*   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
+* |  |
+* v  v
+*+-+-+++++
+*|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
+*+-+-+++++
+*  ^
+*  |
+*   sum(y) aggOffsetInRow = 4
+* }}}
+*
+*/
+  private[flink] def createPrepareMapFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+aggFieldIndexes: Array[Int],
+groupings: Array[Int],
+inputType:
+RelDataType): MapFunction[Any, Row] = {
 
 val mapReturnType: RowTypeInfo =
   createAggregateBufferDataType(groupings, aggregates, inputType)
 
 val mapFunction = new AggregateMapFunction[Row, Row](
-aggregates, aggFieldIndexes, groupings,
-
mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]]
-
-// the mapping relation between field index of intermediate aggregate 
Row and output Row.
-val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, 
groupings)
-
-// the mapping relation between aggregate function index in list and 
its corresponding
-// field index in output Row.
-val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
-
-if (groupingOffsetMapping.length != groupings.length ||
-aggOffsetMapping.length != namedAggregates.length) {
-  throw new TableException("Could not find output field in input data 
type " +
-  "or aggregate functions.")
-}
-
-val allPartialAggregate = aggregates.map(_.supportPartial).forall(x => 
x)
+  aggregates,
+  aggFieldIndexes,
+  groupings,
+  
mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]]
 
-val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+mapFunction

[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89101873
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -115,14 +124,210 @@ object AggregateUtil {
   intermediateRowArity,
   outputType.getFieldCount)
   }
+groupReduceFunction
+  }
+
+  /**
+* Create IncrementalAggregateReduceFunction for Incremental 
aggregates. It implement
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+*
+*/
+  private[flink] def createIncrementalAggregateReduceFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+val groupingOffsetMapping =
+  getGroupingOffsetAndaggOffsetMapping(
+namedAggregates,
+inputType,
+outputType,
+groupings)._1
+val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+val reduceFunction = new IncrementalAggregateReduceFunction(
+  aggregates,
+  groupingOffsetMapping,
+  intermediateRowArity)
+reduceFunction
+  }
+
+  /**
+* @return groupingOffsetMapping (mapping relation between field index 
of intermediate
+* aggregate Row and output Row.)
+* and aggOffsetMapping (the mapping relation between aggregate 
function index in list
+* and its corresponding field index in output Row.)
+*/
+  def getGroupingOffsetAndaggOffsetMapping(
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = {
+
+// the mapping relation between field index of intermediate aggregate 
Row and output Row.
+val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, 
groupings)
+
+// the mapping relation between aggregate function index in list and 
its corresponding
+// field index in output Row.
+val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
 
-(mapFunction, reduceGroupFunction)
+if (groupingOffsetMapping.length != groupings.length ||
+  aggOffsetMapping.length != namedAggregates.length) {
+  throw new TableException(
+"Could not find output field in input data type " +
+  "or aggregate functions.")
+}
+(groupingOffsetMapping, aggOffsetMapping)
+  }
+
+
+  private[flink] def createAllWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
+  : AllWindowFunction[Row, Row, DataStreamWindow] = {
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+  new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos)
+  .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
+} else {
+  new AggregateAllWindowFunction(aggFunction)
+}
+
+  }
+
+
+  private[flink] def createWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
--- End diff --

Same as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89101806
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -115,14 +124,210 @@ object AggregateUtil {
   intermediateRowArity,
   outputType.getFieldCount)
   }
+groupReduceFunction
+  }
+
+  /**
+* Create IncrementalAggregateReduceFunction for Incremental 
aggregates. It implement
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+*
+*/
+  private[flink] def createIncrementalAggregateReduceFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+val groupingOffsetMapping =
+  getGroupingOffsetAndaggOffsetMapping(
+namedAggregates,
+inputType,
+outputType,
+groupings)._1
+val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+val reduceFunction = new IncrementalAggregateReduceFunction(
+  aggregates,
+  groupingOffsetMapping,
+  intermediateRowArity)
+reduceFunction
+  }
+
+  /**
+* @return groupingOffsetMapping (mapping relation between field index 
of intermediate
+* aggregate Row and output Row.)
+* and aggOffsetMapping (the mapping relation between aggregate 
function index in list
+* and its corresponding field index in output Row.)
+*/
+  def getGroupingOffsetAndaggOffsetMapping(
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = {
+
+// the mapping relation between field index of intermediate aggregate 
Row and output Row.
+val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, 
groupings)
+
+// the mapping relation between aggregate function index in list and 
its corresponding
+// field index in output Row.
+val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
 
-(mapFunction, reduceGroupFunction)
+if (groupingOffsetMapping.length != groupings.length ||
+  aggOffsetMapping.length != namedAggregates.length) {
+  throw new TableException(
+"Could not find output field in input data type " +
+  "or aggregate functions.")
+}
+(groupingOffsetMapping, aggOffsetMapping)
+  }
+
+
+  private[flink] def createAllWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
--- End diff --

I'd generate the `GroupReduceFunction` directly in this method. That will 
make the calling code simpler.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89088166
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -39,66 +45,69 @@ object AggregateUtil {
   type JavaList[T] = java.util.List[T]
 
   /**
-   * Create Flink operator functions for aggregates. It includes 2 
implementations of Flink 
-   * operator functions:
-   * [[org.apache.flink.api.common.functions.MapFunction]] and 
-   * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's 
partial aggregate,
-   * should also implement 
[[org.apache.flink.api.common.functions.CombineFunction]] as well). 
-   * The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the 
-   * intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
-   * format:
-   *
-   * {{{
-   *   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
-   * |  |
-   * v  v
-   *+-+-+++++
-   *|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
-   *+-+-+++++
-   *  ^
-   *  |
-   *   sum(y) aggOffsetInRow = 4
-   * }}}
-   *
-   */
-  def createOperatorFunctionsForAggregates(
-  namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-  inputType: RelDataType,
-  outputType: RelDataType,
-  groupings: Array[Int])
-: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = {
-
-val aggregateFunctionsAndFieldIndexes =
-  transformToAggregateFunctions(namedAggregates.map(_.getKey), 
inputType, groupings.length)
-// store the aggregate fields of each aggregate function, by the same 
order of aggregates.
-val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1
-val aggregates = aggregateFunctionsAndFieldIndexes._2
+* Create prepare MapFunction for aggregates.
+* The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the
+* intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
+* format:
+*
+* {{{
+*   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
+* |  |
+* v  v
+*+-+-+++++
+*|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
+*+-+-+++++
+*  ^
+*  |
+*   sum(y) aggOffsetInRow = 4
+* }}}
+*
+*/
+  private[flink] def createPrepareMapFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+aggFieldIndexes: Array[Int],
+groupings: Array[Int],
+inputType:
+RelDataType): MapFunction[Any, Row] = {
 
 val mapReturnType: RowTypeInfo =
   createAggregateBufferDataType(groupings, aggregates, inputType)
 
 val mapFunction = new AggregateMapFunction[Row, Row](
-aggregates, aggFieldIndexes, groupings,
-
mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]]
-
-// the mapping relation between field index of intermediate aggregate 
Row and output Row.
-val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, 
groupings)
-
-// the mapping relation between aggregate function index in list and 
its corresponding
-// field index in output Row.
-val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
-
-if (groupingOffsetMapping.length != groupings.length ||
-aggOffsetMapping.length != namedAggregates.length) {
-  throw new TableException("Could not find output field in input data 
type " +
-  "or aggregate functions.")
-}
-
-val allPartialAggregate = aggregates.map(_.supportPartial).forall(x => 
x)
+  aggregates,
+  aggFieldIndexes,
+  groupings,
+  
mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]]
 
-val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+mapFunction

[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89089242
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -115,14 +124,210 @@ object AggregateUtil {
   intermediateRowArity,
   outputType.getFieldCount)
   }
+groupReduceFunction
+  }
+
+  /**
+* Create IncrementalAggregateReduceFunction for Incremental 
aggregates. It implement
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+*
+*/
+  private[flink] def createIncrementalAggregateReduceFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+val groupingOffsetMapping =
+  getGroupingOffsetAndaggOffsetMapping(
+namedAggregates,
+inputType,
+outputType,
+groupings)._1
+val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+val reduceFunction = new IncrementalAggregateReduceFunction(
+  aggregates,
+  groupingOffsetMapping,
+  intermediateRowArity)
+reduceFunction
+  }
+
+  /**
+* @return groupingOffsetMapping (mapping relation between field index 
of intermediate
+* aggregate Row and output Row.)
+* and aggOffsetMapping (the mapping relation between aggregate 
function index in list
+* and its corresponding field index in output Row.)
+*/
+  def getGroupingOffsetAndaggOffsetMapping(
--- End diff --

Can you move this method below the `create*Function` methods and make it 
`private`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89102066
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -115,14 +124,210 @@ object AggregateUtil {
   intermediateRowArity,
   outputType.getFieldCount)
   }
+groupReduceFunction
+  }
+
+  /**
+* Create IncrementalAggregateReduceFunction for Incremental 
aggregates. It implement
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+*
+*/
+  private[flink] def createIncrementalAggregateReduceFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): IncrementalAggregateReduceFunction = {
+val groupingOffsetMapping =
+  getGroupingOffsetAndaggOffsetMapping(
+namedAggregates,
+inputType,
+outputType,
+groupings)._1
+val intermediateRowArity = groupings.length + 
aggregates.map(_.intermediateDataType.length).sum
+val reduceFunction = new IncrementalAggregateReduceFunction(
+  aggregates,
+  groupingOffsetMapping,
+  intermediateRowArity)
+reduceFunction
+  }
+
+  /**
+* @return groupingOffsetMapping (mapping relation between field index 
of intermediate
+* aggregate Row and output Row.)
+* and aggOffsetMapping (the mapping relation between aggregate 
function index in list
+* and its corresponding field index in output Row.)
+*/
+  def getGroupingOffsetAndaggOffsetMapping(
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int]): (Array[(Int, Int)], Array[(Int, Int)]) = {
+
+// the mapping relation between field index of intermediate aggregate 
Row and output Row.
+val groupingOffsetMapping = getGroupKeysMapping(inputType, outputType, 
groupings)
+
+// the mapping relation between aggregate function index in list and 
its corresponding
+// field index in output Row.
+val aggOffsetMapping = getAggregateMapping(namedAggregates, outputType)
 
-(mapFunction, reduceGroupFunction)
+if (groupingOffsetMapping.length != groupings.length ||
+  aggOffsetMapping.length != namedAggregates.length) {
+  throw new TableException(
+"Could not find output field in input data type " +
+  "or aggregate functions.")
+}
+(groupingOffsetMapping, aggOffsetMapping)
+  }
+
+
+  private[flink] def createAllWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
+  : AllWindowFunction[Row, Row, DataStreamWindow] = {
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+  new AggregateAllTimeWindowFunction(aggFunction, startPos, endPos)
+  .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]]
+} else {
+  new AggregateAllWindowFunction(aggFunction)
+}
+
+  }
+
+
+  private[flink] def createWindowAggregationFunction(
+window: LogicalWindow,
+properties: Seq[NamedWindowProperty],
+aggFunction: RichGroupReduceFunction[Row, Row])
+  : WindowFunction[Row, Row, Tuple, DataStreamWindow] = {
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+  new AggregateTimeWindowFunction(aggFunction, startPos, endPos)
+  .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]]
+} else {
+  new AggregateWindowFunction(aggFunction)
+}
+
+  }
+
+  private[flink] def createAllWindowIncrementalAggregationFunction(
+window: LogicalWindow,
+aggregates: Array[Aggregate[_ <: Any]],
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int],
+properties: Seq[NamedWindowProperty])
+  : AllWindowFunction[Row, Row, DataStreamWindow] = {
+
+val (groupingOffsetMapping, aggOffsetMapping) =
+  getGroupingOffsetAndaggOffsetMapping(
+  namedAggregates,
+  inputType,
+  outputType,
+  groupings)
+
+val finalRowArity = outputType.getFieldCount
+
+if (isTimeWindow(window)) {
+  val (startPos, endPos) = computeWindowStartEndPropertyPos(properties)
+  new 

[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89086106
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -39,66 +45,69 @@ object AggregateUtil {
   type JavaList[T] = java.util.List[T]
 
   /**
-   * Create Flink operator functions for aggregates. It includes 2 
implementations of Flink 
-   * operator functions:
-   * [[org.apache.flink.api.common.functions.MapFunction]] and 
-   * [[org.apache.flink.api.common.functions.GroupReduceFunction]](if it's 
partial aggregate,
-   * should also implement 
[[org.apache.flink.api.common.functions.CombineFunction]] as well). 
-   * The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the 
-   * intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
-   * format:
-   *
-   * {{{
-   *   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
-   * |  |
-   * v  v
-   *+-+-+++++
-   *|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
-   *+-+-+++++
-   *  ^
-   *  |
-   *   sum(y) aggOffsetInRow = 4
-   * }}}
-   *
-   */
-  def createOperatorFunctionsForAggregates(
-  namedAggregates: Seq[CalcitePair[AggregateCall, String]],
-  inputType: RelDataType,
-  outputType: RelDataType,
-  groupings: Array[Int])
-: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = {
-
-val aggregateFunctionsAndFieldIndexes =
-  transformToAggregateFunctions(namedAggregates.map(_.getKey), 
inputType, groupings.length)
-// store the aggregate fields of each aggregate function, by the same 
order of aggregates.
-val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1
-val aggregates = aggregateFunctionsAndFieldIndexes._2
+* Create prepare MapFunction for aggregates.
+* The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the
+* intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
+* format:
+*
+* {{{
+*   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
+* |  |
+* v  v
+*+-+-+++++
+*|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
+*+-+-+++++
+*  ^
+*  |
+*   sum(y) aggOffsetInRow = 4
+* }}}
+*
+*/
+  private[flink] def createPrepareMapFunction(
+aggregates: Array[Aggregate[_ <: Any]],
+aggFieldIndexes: Array[Int],
+groupings: Array[Int],
+inputType:
+RelDataType): MapFunction[Any, Row] = {
--- End diff --

remove this line break.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89088443
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -115,14 +124,210 @@ object AggregateUtil {
   intermediateRowArity,
   outputType.getFieldCount)
   }
+groupReduceFunction
+  }
+
+  /**
+* Create IncrementalAggregateReduceFunction for Incremental 
aggregates. It implement
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+*
+*/
+  private[flink] def createIncrementalAggregateReduceFunction(
+aggregates: Array[Aggregate[_ <: Any]],
--- End diff --

can we just use `namedAggregates` and compute `aggregates` from it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-21 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89015695
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.table.Row
+import org.apache.flink.util.Preconditions
+
+/**
+  * For Incremental intermediate aggregate Rows, merge every row into 
aggregate buffer.
+  *
+  * @param aggregates   The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row
+  * and output Row.
+  */
+class IncrementalAggregateReduceFunction(
+private val aggregates: Array[Aggregate[_]],
+private val groupKeysMapping: Array[(Int, Int)],
+private val intermediateRowArity: Int)extends ReduceFunction[Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(groupKeysMapping)
+  @transient var accumulatorRow:Row = _
+
+  /**
+* For Incremental intermediate aggregate Rows, merge value1 and value2
+* into aggregate buffer, return aggregate buffer.
+*
+* @param value1 The first value to combined.
+* @param value2 The second value to combined.
+* @return The combined value of both input values.
+*
+*/
+  override def reduce(value1: Row, value2: Row): Row = {
+
+if(null == accumulatorRow){
+  accumulatorRow = new Row(intermediateRowArity)
--- End diff --

I'll add a comment that this can be  deleted if FLINK-5105 is solved.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-21 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89014267
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.table.Row
+import org.apache.flink.util.Preconditions
+
+/**
+  * For Incremental intermediate aggregate Rows, merge every row into 
aggregate buffer.
+  *
+  * @param aggregates   The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row
+  * and output Row.
+  */
+class IncrementalAggregateReduceFunction(
+private val aggregates: Array[Aggregate[_]],
+private val groupKeysMapping: Array[(Int, Int)],
+private val intermediateRowArity: Int)extends ReduceFunction[Row] {
--- End diff --

i'm sorry for that. i'll do it .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-21 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89014242
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTimeWindowFunction.scala
 ---
@@ -52,6 +52,6 @@ class AggregateTimeWindowFunction(
 collector.timeWindow = window
 
 // call wrapped reduce function with property collector
-groupReduceFunction.reduce(input, collector)
+super.apply(key,window,input,collector)
--- End diff --

i'm sorry for that. i'll do it .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-21 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89012878
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateMapFunction.scala
 ---
@@ -30,9 +30,9 @@ class AggregateMapFunction[IN, OUT](
 private val groupingKeys: Array[Int],
 @transient private val returnType: TypeInformation[OUT])
 extends RichMapFunction[IN, OUT] with ResultTypeQueryable[OUT] {
-  
--- End diff --

yes,i will do it .
By the way Can we declare the style of the uniform Class / Object / Trait / 
Method as shown in the following example?

1. Class/Object/Trait put each constructor argument on its own line, 
indented four spaces:
If a class/object/trait extends anything, put each constructor argument on 
its own line, indented four spaces and two spaces for extensions:

// wrong!
'''
class DataStreamAggregate(
  window: LogicalWindow,
  namedProperties: Seq[NamedWindowProperty],
  cluster: RelOptCluster,
  traitSet: RelTraitSet,
  inputNode: RelNode,
  namedAggregates: Seq[CalcitePair[AggregateCall, String]],
  rowRelDataType: RelDataType,
  inputType: RelDataType,
  grouping: Array[Int])
  extends SingleRel(cluster, traitSet, inputNode)
  with FlinkAggregate
  with DataStreamRel {
'''

// right!
'''
class DataStreamAggregate(
window: LogicalWindow,
namedProperties: Seq[NamedWindowProperty],
cluster: RelOptCluster,
traitSet: RelTraitSet,
inputNode: RelNode,
namedAggregates: Seq[CalcitePair[AggregateCall, String]],
rowRelDataType: RelDataType,
inputType: RelDataType,
grouping: Array[Int])
  extends SingleRel(cluster, traitSet, inputNode)
  with FlinkAggregate
  with DataStreamRel {
  '''

2. Methods with Numerous Arguments  indented two spaces from the current 
indent level:

// wrong!
'''private[flink] def createIncrementalAggregateReduceFunction(
   aggregates: Array[Aggregate[_ <: Any]],
   namedAggregates: Seq[CalcitePair[AggregateCall, String]],
   inputType: RelDataType,
   outputType: RelDataType,
   groupings: Array[Int]): IncrementalAggregateReduceFunction = {
 '''

// right!
 '''
 private[flink] def createIncrementalAggregateReduceFunction(
   aggregates: Array[Aggregate[_ <: Any]],
   namedAggregates: Seq[CalcitePair[AggregateCall, String]],
   inputType: RelDataType,
   outputType: RelDataType,
   groupings: Array[Int]): IncrementalAggregateReduceFunction = {

 '''


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-21 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89012187
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
 ---
@@ -231,6 +297,64 @@ object DataStreamAggregate {
 
   }
 
+  private def createAllWindowIncrementalAggregationFunction(
--- End diff --

agree!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-21 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89012143
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
 ---
@@ -135,50 +128,123 @@ class DataStreamAggregate(
   namedProperties)
 
 val prepareOpName = s"prepare select: ($aggString)"
-val mappedInput = inputDS
-  .map(aggregateResult._1)
-  .name(prepareOpName)
-
-val groupReduceFunction = aggregateResult._2
-val rowTypeInfo = new RowTypeInfo(fieldTypes)
-
-val result = {
-  // grouped / keyed aggregation
-  if (groupingKeys.length > 0) {
-val aggOpName = s"groupBy: (${groupingToString(inputType, 
grouping)}), " +
-  s"window: ($window), " +
-  s"select: ($aggString)"
-val aggregateFunction =
-  createWindowAggregationFunction(window, namedProperties, 
groupReduceFunction)
-
-val keyedStream = mappedInput.keyBy(groupingKeys: _*)
-
-val windowedStream = createKeyedWindowedStream(window, keyedStream)
-  .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
-
-windowedStream
-  .apply(aggregateFunction)
-  .returns(rowTypeInfo)
-  .name(aggOpName)
-  .asInstanceOf[DataStream[Any]]
+val keyedAggOpName = s"groupBy: (${groupingToString(inputType, 
grouping)}), " +
+  s"window: ($window), " +
+  s"select: ($aggString)"
+val nonKeyedAggOpName = s"window: ($window), select: ($aggString)"
+
+val (aggFieldIndexes, aggregates) =
+  AggregateUtil.transformToAggregateFunctions(
+namedAggregates.map(_.getKey), inputType, grouping.length)
+
+val result: DataStream[Any] = {
+  // check whether all aggregates support partial aggregate
+  if (aggregates.forall(_.supportPartial)){
+// do Incremental Aggregation
+// add grouping fields, position keys in the input, and input type
+val (mapFunction,
+reduceFunction,
+groupingOffsetMapping,
+aggOffsetMapping,
+intermediateRowArity) = 
AggregateUtil.createOperatorFunctionsForIncrementalAggregates(
+  namedAggregates,
+  inputType,
+  getRowType,
+  grouping,
+  aggregates,
+  aggFieldIndexes)
+
+val mappedInput = inputDS
+  .map(mapFunction)
+  .name(prepareOpName)
+
+// grouped / keyed aggregation
+if (groupingKeys.length > 0) {
+
+  val winFunction =
+createWindowIncrementalAggregationFunction(
+  aggregates,
+  groupingOffsetMapping,
+  aggOffsetMapping,
+  getRowType.getFieldCount,
+  intermediateRowArity,
+  window,
+  namedProperties)
+
+  val keyedStream = mappedInput.keyBy(groupingKeys: _*)
+  val windowedStream = createKeyedWindowedStream(window, 
keyedStream)
+.asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
+
+  windowedStream
+.apply(reduceFunction, winFunction)
+.returns(rowTypeInfo)
+.name(keyedAggOpName)
+.asInstanceOf[DataStream[Any]]
+}
+// global / non-keyed aggregation
+else {
+  val winFunction =
+createAllWindowIncrementalAggregationFunction(
+  aggregates,
+  groupingOffsetMapping,
+  aggOffsetMapping,
+  getRowType.getFieldCount,
+  intermediateRowArity,
+  window,
+  namedProperties)
+
+  val windowedStream = createNonKeyedWindowedStream(window, 
mappedInput)
+.asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
+  windowedStream
+.apply(reduceFunction, winFunction)
+.returns(rowTypeInfo)
+.name(nonKeyedAggOpName)
+.asInstanceOf[DataStream[Any]]
+}
   }
-  // global / non-keyed aggregation
   else {
-val aggOpName = s"window: ($window), select: ($aggString)"
-val aggregateFunction =
-  createAllWindowAggregationFunction(window, namedProperties, 
groupReduceFunction)
-
-val windowedStream = createNonKeyedWindowedStream(window, 
mappedInput)
-  .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
-
-windowedStream
-  .apply(aggregateFunction)
-  .returns(rowTypeInfo)
-  

[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-21 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r89012102
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
 ---
@@ -135,50 +128,123 @@ class DataStreamAggregate(
   namedProperties)
 
 val prepareOpName = s"prepare select: ($aggString)"
-val mappedInput = inputDS
-  .map(aggregateResult._1)
-  .name(prepareOpName)
-
-val groupReduceFunction = aggregateResult._2
-val rowTypeInfo = new RowTypeInfo(fieldTypes)
-
-val result = {
-  // grouped / keyed aggregation
-  if (groupingKeys.length > 0) {
-val aggOpName = s"groupBy: (${groupingToString(inputType, 
grouping)}), " +
-  s"window: ($window), " +
-  s"select: ($aggString)"
-val aggregateFunction =
-  createWindowAggregationFunction(window, namedProperties, 
groupReduceFunction)
-
-val keyedStream = mappedInput.keyBy(groupingKeys: _*)
-
-val windowedStream = createKeyedWindowedStream(window, keyedStream)
-  .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
-
-windowedStream
-  .apply(aggregateFunction)
-  .returns(rowTypeInfo)
-  .name(aggOpName)
-  .asInstanceOf[DataStream[Any]]
+val keyedAggOpName = s"groupBy: (${groupingToString(inputType, 
grouping)}), " +
+  s"window: ($window), " +
+  s"select: ($aggString)"
+val nonKeyedAggOpName = s"window: ($window), select: ($aggString)"
+
+val (aggFieldIndexes, aggregates) =
+  AggregateUtil.transformToAggregateFunctions(
+namedAggregates.map(_.getKey), inputType, grouping.length)
+
+val result: DataStream[Any] = {
+  // check whether all aggregates support partial aggregate
+  if (aggregates.forall(_.supportPartial)){
+// do Incremental Aggregation
+// add grouping fields, position keys in the input, and input type
+val (mapFunction,
+reduceFunction,
+groupingOffsetMapping,
+aggOffsetMapping,
+intermediateRowArity) = 
AggregateUtil.createOperatorFunctionsForIncrementalAggregates(
--- End diff --

agree!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-21 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88861427
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.table.Row
+import org.apache.flink.util.Preconditions
+
+/**
+  * For Incremental intermediate aggregate Rows, merge every row into 
aggregate buffer.
+  *
+  * @param aggregates   The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row
+  * and output Row.
+  */
+class IncrementalAggregateReduceFunction(
+private val aggregates: Array[Aggregate[_]],
+private val groupKeysMapping: Array[(Int, Int)],
+private val intermediateRowArity: Int)extends ReduceFunction[Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(groupKeysMapping)
+  @transient var accumulatorRow:Row = _
+
+  /**
+* For Incremental intermediate aggregate Rows, merge value1 and value2
+* into aggregate buffer, return aggregate buffer.
+*
+* @param value1 The first value to combined.
+* @param value2 The second value to combined.
+* @return The combined value of both input values.
+*
+*/
+  override def reduce(value1: Row, value2: Row): Row = {
+
+if(null == accumulatorRow){
+  accumulatorRow = new Row(intermediateRowArity)
+}
+
+// Initiate intermediate aggregate value.
+aggregates.foreach(_.initiate(accumulatorRow))
--- End diff --

@fhueske But you're right, we could enforce a copy there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-21 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88861382
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.table.Row
+import org.apache.flink.util.Preconditions
+
+/**
+  * For Incremental intermediate aggregate Rows, merge every row into 
aggregate buffer.
+  *
+  * @param aggregates   The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row
+  * and output Row.
+  */
+class IncrementalAggregateReduceFunction(
+private val aggregates: Array[Aggregate[_]],
+private val groupKeysMapping: Array[(Int, Int)],
+private val intermediateRowArity: Int)extends ReduceFunction[Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(groupKeysMapping)
+  @transient var accumulatorRow:Row = _
+
+  /**
+* For Incremental intermediate aggregate Rows, merge value1 and value2
+* into aggregate buffer, return aggregate buffer.
+*
+* @param value1 The first value to combined.
+* @param value2 The second value to combined.
+* @return The combined value of both input values.
+*
+*/
+  override def reduce(value1: Row, value2: Row): Row = {
+
+if(null == accumulatorRow){
+  accumulatorRow = new Row(intermediateRowArity)
+}
+
+// Initiate intermediate aggregate value.
+aggregates.foreach(_.initiate(accumulatorRow))
--- End diff --

I think a lot of people would use it for production use cases. (Maybe 
you're confusing this with the old distinction between `MemStateBackend` and 
`FsStateBackend`? Internally, both memory and file backend now use a 
`HeapStateBackend` but for the file backend the contents are checkpointed to a 
FileSystem.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-21 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88856847
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.table.Row
+import org.apache.flink.util.Preconditions
+
+/**
+  * For Incremental intermediate aggregate Rows, merge every row into 
aggregate buffer.
+  *
+  * @param aggregates   The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row
+  * and output Row.
+  */
+class IncrementalAggregateReduceFunction(
+private val aggregates: Array[Aggregate[_]],
+private val groupKeysMapping: Array[(Int, Int)],
+private val intermediateRowArity: Int)extends ReduceFunction[Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(groupKeysMapping)
+  @transient var accumulatorRow:Row = _
+
+  /**
+* For Incremental intermediate aggregate Rows, merge value1 and value2
+* into aggregate buffer, return aggregate buffer.
+*
+* @param value1 The first value to combined.
+* @param value2 The second value to combined.
+* @return The combined value of both input values.
+*
+*/
+  override def reduce(value1: Row, value2: Row): Row = {
+
+if(null == accumulatorRow){
+  accumulatorRow = new Row(intermediateRowArity)
+}
+
+// Initiate intermediate aggregate value.
+aggregates.foreach(_.initiate(accumulatorRow))
--- End diff --

@wuchong, yes this is a problem with the `HeapStateBackend`. The RocksDB 
backend does not suffer from this problem. I think in the long run we should 
migrate the `HeapStateBackend` to always keep data in serialised form, then we 
also won't have this problem anymore.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-21 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88847319
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.table.Row
+import org.apache.flink.util.Preconditions
+
+/**
+  * For Incremental intermediate aggregate Rows, merge every row into 
aggregate buffer.
+  *
+  * @param aggregates   The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row
+  * and output Row.
+  */
+class IncrementalAggregateReduceFunction(
+private val aggregates: Array[Aggregate[_]],
+private val groupKeysMapping: Array[(Int, Int)],
+private val intermediateRowArity: Int)extends ReduceFunction[Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(groupKeysMapping)
+  @transient var accumulatorRow:Row = _
+
+  /**
+* For Incremental intermediate aggregate Rows, merge value1 and value2
+* into aggregate buffer, return aggregate buffer.
+*
+* @param value1 The first value to combined.
+* @param value2 The second value to combined.
+* @return The combined value of both input values.
+*
+*/
+  override def reduce(value1: Row, value2: Row): Row = {
+
+if(null == accumulatorRow){
+  accumulatorRow = new Row(intermediateRowArity)
+}
+
+// Initiate intermediate aggregate value.
+aggregates.foreach(_.initiate(accumulatorRow))
--- End diff --

Okay,I have created 
[FLINK-5105](https://issues.apache.org/jira/browse/FLINK-5105) and 
[FLINK-5106](https://issues.apache.org/jira/browse/FLINK-5106) .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88840617
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.table.Row
+import org.apache.flink.util.Preconditions
+
+/**
+  * For Incremental intermediate aggregate Rows, merge every row into 
aggregate buffer.
+  *
+  * @param aggregates   The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row
+  * and output Row.
+  */
+class IncrementalAggregateReduceFunction(
+private val aggregates: Array[Aggregate[_]],
+private val groupKeysMapping: Array[(Int, Int)],
+private val intermediateRowArity: Int)extends ReduceFunction[Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(groupKeysMapping)
+  @transient var accumulatorRow:Row = _
+
+  /**
+* For Incremental intermediate aggregate Rows, merge value1 and value2
+* into aggregate buffer, return aggregate buffer.
+*
+* @param value1 The first value to combined.
+* @param value2 The second value to combined.
+* @return The combined value of both input values.
+*
+*/
+  override def reduce(value1: Row, value2: Row): Row = {
+
+if(null == accumulatorRow){
+  accumulatorRow = new Row(intermediateRowArity)
+}
+
+// Initiate intermediate aggregate value.
+aggregates.foreach(_.initiate(accumulatorRow))
--- End diff --

Yes, that would be the way to go, IMO. 
Can you create two JIRA issues (improving `ReduceState` and improving 
`IncrementalAggregateReduceFunction`) and link them?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-20 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88824600
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.table.Row
+import org.apache.flink.util.Preconditions
+
+/**
+  * For Incremental intermediate aggregate Rows, merge every row into 
aggregate buffer.
+  *
+  * @param aggregates   The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row
+  * and output Row.
+  */
+class IncrementalAggregateReduceFunction(
+private val aggregates: Array[Aggregate[_]],
+private val groupKeysMapping: Array[(Int, Int)],
+private val intermediateRowArity: Int)extends ReduceFunction[Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(groupKeysMapping)
+  @transient var accumulatorRow:Row = _
+
+  /**
+* For Incremental intermediate aggregate Rows, merge value1 and value2
+* into aggregate buffer, return aggregate buffer.
+*
+* @param value1 The first value to combined.
+* @param value2 The second value to combined.
+* @return The combined value of both input values.
+*
+*/
+  override def reduce(value1: Row, value2: Row): Row = {
+
+if(null == accumulatorRow){
+  accumulatorRow = new Row(intermediateRowArity)
+}
+
+// Initiate intermediate aggregate value.
+aggregates.foreach(_.initiate(accumulatorRow))
--- End diff --

Because of last week's vacation, I'm sorry to reply back later. Thanks to 
@fhueske  and @wuchong 's discussion. At the beginning I did try to return 
value1 or value2 , but it does not work, @fhueske is right! So I added the 
accumulatorRow to compute. At the beginning I created a new accumulatorRow 
object every time in the reduce method, and later the accumulatorRow was 
changed to a class-shared variable for reduced create object, which resulted in 
this error. and so before optimizing ReducingState  maybe we need to create 
accumulatorRow each time in reduce method  And after optimizing ReducingState, 
we can remove accumulatorRow. is this right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-20 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88796985
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.table.Row
+import org.apache.flink.util.Preconditions
+
+/**
+  * For Incremental intermediate aggregate Rows, merge every row into 
aggregate buffer.
+  *
+  * @param aggregates   The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row
+  * and output Row.
+  */
+class IncrementalAggregateReduceFunction(
+private val aggregates: Array[Aggregate[_]],
+private val groupKeysMapping: Array[(Int, Int)],
+private val intermediateRowArity: Int)extends ReduceFunction[Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(groupKeysMapping)
+  @transient var accumulatorRow:Row = _
+
+  /**
+* For Incremental intermediate aggregate Rows, merge value1 and value2
+* into aggregate buffer, return aggregate buffer.
+*
+* @param value1 The first value to combined.
+* @param value2 The second value to combined.
+* @return The combined value of both input values.
+*
+*/
+  override def reduce(value1: Row, value2: Row): Row = {
+
+if(null == accumulatorRow){
+  accumulatorRow = new Row(intermediateRowArity)
+}
+
+// Initiate intermediate aggregate value.
+aggregates.foreach(_.initiate(accumulatorRow))
--- End diff --

Hi @fhueske ,  you are right, in case of sliding window, the result will be 
incorrect. But the `accumulatorRow` way has the same problem, because the same 
`accumulatorRow` object is used in multiple windows as reduce state.

Try this case 

```scala
val data = List(
(2L, 2, "Hello"),
(3L, 2, "Hello"),
(4L, 2, "Hello"))

val stream = env
  .fromCollection(data)
  .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
val table = stream.toTable(tEnv, 'long, 'int, 'string)

val windowedTable = table
  .groupBy('string)
  .window(Slide over 10.milli every 5.milli on 'rowtime as 'w)
  .select('string, 'int.count, 'w.start, 'w.end, 'w.start)
```

The expected result should be 

```
"Hello,3,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005,1969-12-31 
23:59:59.995",
"Hello,3,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01,1970-01-01 00:00:00.0"
```

But actually it is 

```
"Hello,4,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005,1969-12-31 
23:59:59.995",
"Hello,4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01,1970-01-01 00:00:00.0"
```

I think it is a bug of `HeapReducingState` that the element put into (or 
get) State should be always a copy.  @aljoscha  what do you think about this ? 






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88778828
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.table.Row
+import org.apache.flink.util.Preconditions
+
+/**
+  * For Incremental intermediate aggregate Rows, merge every row into 
aggregate buffer.
+  *
+  * @param aggregates   The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row
+  * and output Row.
+  */
+class IncrementalAggregateReduceFunction(
+private val aggregates: Array[Aggregate[_]],
+private val groupKeysMapping: Array[(Int, Int)],
+private val intermediateRowArity: Int)extends ReduceFunction[Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(groupKeysMapping)
+  @transient var accumulatorRow:Row = _
+
+  /**
+* For Incremental intermediate aggregate Rows, merge value1 and value2
+* into aggregate buffer, return aggregate buffer.
+*
+* @param value1 The first value to combined.
+* @param value2 The second value to combined.
+* @return The combined value of both input values.
+*
+*/
+  override def reduce(value1: Row, value2: Row): Row = {
+
+if(null == accumulatorRow){
+  accumulatorRow = new Row(intermediateRowArity)
+}
+
+// Initiate intermediate aggregate value.
+aggregates.foreach(_.initiate(accumulatorRow))
--- End diff --

That was my first thought as well, but does not work. In case of 
overlapping sliding windows, multiple references are hold on the same object. 
If we modify `value1` or `value2` the results are incorrect. We could open a 
JIRA, that the first value that is put into a ReducingState is always copied. 
That would allow to modify and emit one of the two input values (the one which 
is comes from the state).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-19 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88778110
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.table.Row
+import org.apache.flink.util.Preconditions
+
+/**
+  * For Incremental intermediate aggregate Rows, merge every row into 
aggregate buffer.
+  *
+  * @param aggregates   The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row
+  * and output Row.
+  */
+class IncrementalAggregateReduceFunction(
+private val aggregates: Array[Aggregate[_]],
+private val groupKeysMapping: Array[(Int, Int)],
+private val intermediateRowArity: Int)extends ReduceFunction[Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(groupKeysMapping)
+  @transient var accumulatorRow:Row = _
+
+  /**
+* For Incremental intermediate aggregate Rows, merge value1 and value2
+* into aggregate buffer, return aggregate buffer.
+*
+* @param value1 The first value to combined.
+* @param value2 The second value to combined.
+* @return The combined value of both input values.
+*
+*/
+  override def reduce(value1: Row, value2: Row): Row = {
+
+if(null == accumulatorRow){
+  accumulatorRow = new Row(intermediateRowArity)
+}
+
+// Initiate intermediate aggregate value.
+aggregates.foreach(_.initiate(accumulatorRow))
--- End diff --

Could we merge `value2` into `value1` and return `value1` ? So that the 
`accumulatorRow` could be left out.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88641722
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.table.Row
+import org.apache.flink.util.Preconditions
+
+/**
+  * For Incremental intermediate aggregate Rows, merge every row into 
aggregate buffer.
+  *
+  * @param aggregates   The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row
+  * and output Row.
+  */
+class IncrementalAggregateReduceFunction(
+private val aggregates: Array[Aggregate[_]],
+private val groupKeysMapping: Array[(Int, Int)],
+private val intermediateRowArity: Int)extends ReduceFunction[Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(groupKeysMapping)
+  @transient var accumulatorRow:Row = _
+
+  /**
+* For Incremental intermediate aggregate Rows, merge value1 and value2
+* into aggregate buffer, return aggregate buffer.
+*
+* @param value1 The first value to combined.
+* @param value2 The second value to combined.
+* @return The combined value of both input values.
+*
+*/
+  override def reduce(value1: Row, value2: Row): Row = {
+
+if(null == accumulatorRow){
+  accumulatorRow = new Row(intermediateRowArity)
--- End diff --

Can you add a comment that this can be moved to `RichFunction.open()` once 
[FLINK-5094](https://issues.apache.org/jira/browse/FLINK-5094) is resolved?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88657597
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
 ---
@@ -135,50 +128,123 @@ class DataStreamAggregate(
   namedProperties)
 
 val prepareOpName = s"prepare select: ($aggString)"
-val mappedInput = inputDS
-  .map(aggregateResult._1)
-  .name(prepareOpName)
-
-val groupReduceFunction = aggregateResult._2
-val rowTypeInfo = new RowTypeInfo(fieldTypes)
-
-val result = {
-  // grouped / keyed aggregation
-  if (groupingKeys.length > 0) {
-val aggOpName = s"groupBy: (${groupingToString(inputType, 
grouping)}), " +
-  s"window: ($window), " +
-  s"select: ($aggString)"
-val aggregateFunction =
-  createWindowAggregationFunction(window, namedProperties, 
groupReduceFunction)
-
-val keyedStream = mappedInput.keyBy(groupingKeys: _*)
-
-val windowedStream = createKeyedWindowedStream(window, keyedStream)
-  .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
-
-windowedStream
-  .apply(aggregateFunction)
-  .returns(rowTypeInfo)
-  .name(aggOpName)
-  .asInstanceOf[DataStream[Any]]
+val keyedAggOpName = s"groupBy: (${groupingToString(inputType, 
grouping)}), " +
+  s"window: ($window), " +
+  s"select: ($aggString)"
+val nonKeyedAggOpName = s"window: ($window), select: ($aggString)"
+
+val (aggFieldIndexes, aggregates) =
+  AggregateUtil.transformToAggregateFunctions(
+namedAggregates.map(_.getKey), inputType, grouping.length)
+
+val result: DataStream[Any] = {
+  // check whether all aggregates support partial aggregate
+  if (aggregates.forall(_.supportPartial)){
+// do Incremental Aggregation
+// add grouping fields, position keys in the input, and input type
+val (mapFunction,
+reduceFunction,
+groupingOffsetMapping,
+aggOffsetMapping,
+intermediateRowArity) = 
AggregateUtil.createOperatorFunctionsForIncrementalAggregates(
--- End diff --

Can we add a separate method to create the preparing `MapFunction` to 
`AggregateUtil`?
This is code that is shared for all aggregations (batch, streaming), 
(incremental, non-incremental), etc. 

Would be nice to have that extracted and the mapper applied outside of this 
large condition. Would be great if you could refactor the DataSetAggregate code 
on the way as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88638112
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.table.Row
+import org.apache.flink.util.Preconditions
+
+/**
+  * For Incremental intermediate aggregate Rows, merge every row into 
aggregate buffer.
+  *
+  * @param aggregates   The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row
+  * and output Row.
+  */
+class IncrementalAggregateReduceFunction(
+private val aggregates: Array[Aggregate[_]],
+private val groupKeysMapping: Array[(Int, Int)],
+private val intermediateRowArity: Int)extends ReduceFunction[Row] {
--- End diff --

add space before `extends`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88659144
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
 ---
@@ -135,50 +128,123 @@ class DataStreamAggregate(
   namedProperties)
 
 val prepareOpName = s"prepare select: ($aggString)"
-val mappedInput = inputDS
-  .map(aggregateResult._1)
-  .name(prepareOpName)
-
-val groupReduceFunction = aggregateResult._2
-val rowTypeInfo = new RowTypeInfo(fieldTypes)
-
-val result = {
-  // grouped / keyed aggregation
-  if (groupingKeys.length > 0) {
-val aggOpName = s"groupBy: (${groupingToString(inputType, 
grouping)}), " +
-  s"window: ($window), " +
-  s"select: ($aggString)"
-val aggregateFunction =
-  createWindowAggregationFunction(window, namedProperties, 
groupReduceFunction)
-
-val keyedStream = mappedInput.keyBy(groupingKeys: _*)
-
-val windowedStream = createKeyedWindowedStream(window, keyedStream)
-  .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
-
-windowedStream
-  .apply(aggregateFunction)
-  .returns(rowTypeInfo)
-  .name(aggOpName)
-  .asInstanceOf[DataStream[Any]]
+val keyedAggOpName = s"groupBy: (${groupingToString(inputType, 
grouping)}), " +
+  s"window: ($window), " +
+  s"select: ($aggString)"
+val nonKeyedAggOpName = s"window: ($window), select: ($aggString)"
+
+val (aggFieldIndexes, aggregates) =
+  AggregateUtil.transformToAggregateFunctions(
+namedAggregates.map(_.getKey), inputType, grouping.length)
+
+val result: DataStream[Any] = {
+  // check whether all aggregates support partial aggregate
+  if (aggregates.forall(_.supportPartial)){
+// do Incremental Aggregation
+// add grouping fields, position keys in the input, and input type
+val (mapFunction,
+reduceFunction,
+groupingOffsetMapping,
+aggOffsetMapping,
+intermediateRowArity) = 
AggregateUtil.createOperatorFunctionsForIncrementalAggregates(
+  namedAggregates,
+  inputType,
+  getRowType,
+  grouping,
+  aggregates,
+  aggFieldIndexes)
+
+val mappedInput = inputDS
+  .map(mapFunction)
+  .name(prepareOpName)
+
+// grouped / keyed aggregation
+if (groupingKeys.length > 0) {
+
+  val winFunction =
+createWindowIncrementalAggregationFunction(
+  aggregates,
+  groupingOffsetMapping,
+  aggOffsetMapping,
+  getRowType.getFieldCount,
+  intermediateRowArity,
+  window,
+  namedProperties)
+
+  val keyedStream = mappedInput.keyBy(groupingKeys: _*)
+  val windowedStream = createKeyedWindowedStream(window, 
keyedStream)
+.asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
+
+  windowedStream
+.apply(reduceFunction, winFunction)
+.returns(rowTypeInfo)
+.name(keyedAggOpName)
+.asInstanceOf[DataStream[Any]]
+}
+// global / non-keyed aggregation
+else {
+  val winFunction =
+createAllWindowIncrementalAggregationFunction(
+  aggregates,
+  groupingOffsetMapping,
+  aggOffsetMapping,
+  getRowType.getFieldCount,
+  intermediateRowArity,
+  window,
+  namedProperties)
+
+  val windowedStream = createNonKeyedWindowedStream(window, 
mappedInput)
+.asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
+  windowedStream
+.apply(reduceFunction, winFunction)
+.returns(rowTypeInfo)
+.name(nonKeyedAggOpName)
+.asInstanceOf[DataStream[Any]]
+}
   }
-  // global / non-keyed aggregation
   else {
-val aggOpName = s"window: ($window), select: ($aggString)"
-val aggregateFunction =
-  createAllWindowAggregationFunction(window, namedProperties, 
groupReduceFunction)
-
-val windowedStream = createNonKeyedWindowedStream(window, 
mappedInput)
-  .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
-
-windowedStream
-  .apply(aggregateFunction)
-  .returns(rowTypeInfo)
- 

[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88633784
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateMapFunction.scala
 ---
@@ -30,9 +30,9 @@ class AggregateMapFunction[IN, OUT](
 private val groupingKeys: Array[Int],
 @transient private val returnType: TypeInformation[OUT])
 extends RichMapFunction[IN, OUT] with ResultTypeQueryable[OUT] {
-  
--- End diff --

Please undo the changes on this file.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88635921
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllTimeWindowFunction.scala
 ---
@@ -48,6 +48,6 @@ class AggregateAllTimeWindowFunction(
 collector.timeWindow = window
 
 // call wrapped reduce function with property collector
-groupReduceFunction.reduce(input, collector)
+super.apply(window,input,collector)
--- End diff --

please add space between function arguments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88653721
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+import org.apache.flink.api.common.functions.ReduceFunction
+import org.apache.flink.api.table.Row
+import org.apache.flink.util.Preconditions
+
+/**
+  * For Incremental intermediate aggregate Rows, merge every row into 
aggregate buffer.
+  *
+  * @param aggregates   The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row
+  * and output Row.
+  */
+class IncrementalAggregateReduceFunction(
+private val aggregates: Array[Aggregate[_]],
+private val groupKeysMapping: Array[(Int, Int)],
+private val intermediateRowArity: Int)extends ReduceFunction[Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(groupKeysMapping)
+  @transient var accumulatorRow:Row = _
+
+  /**
+* For Incremental intermediate aggregate Rows, merge value1 and value2
+* into aggregate buffer, return aggregate buffer.
+*
+* @param value1 The first value to combined.
+* @param value2 The second value to combined.
+* @return The combined value of both input values.
+*
+*/
+  override def reduce(value1: Row, value2: Row): Row = {
+
+if(null == accumulatorRow){
+  accumulatorRow = new Row(intermediateRowArity)
+}
+
+// Initiate intermediate aggregate value.
+aggregates.foreach(_.initiate(accumulatorRow))
--- End diff --

I think we can make this a bit more lightweight.
Instead of initializing `accumulatorRow` we could copy all fields of 
`value1`:
```
(0 until intermediateRowArity)
  .foreach(i => accumulatorRow.setField(i, value1.productElement(i)))
```

Then we only need to merge `value2` into `accumulatorRow` and do not need 
to copy the groupKeys (`groupKeysMapping` becomes obsolete).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88658743
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
 ---
@@ -231,6 +297,64 @@ object DataStreamAggregate {
 
   }
 
+  private def createAllWindowIncrementalAggregationFunction(
--- End diff --

I think we can move all `create*Window*Function` methods (also the ones 
that have been here before) to `AggregateUtil`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88637570
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.api.table.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow
+import org.apache.flink.util.Collector
+
+/**
+  * It Evaluate final aggregate value.
+  *
+  * @param aggregates   The aggregate functions.
+  * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row
+  * and output Row.
+  * @param aggregateMapping The index mapping between aggregate function 
list and aggregated value
+  * index in output Row.
+  */
+class IncrementalAggregateTimeWindowFunction(
+private val aggregates: Array[Aggregate[_ <: Any]],
+private val groupKeysMapping: Array[(Int, Int)],
+private val aggregateMapping: Array[(Int, Int)],
+private val finalRowArity: Int,
+private val windowStartPos: Option[Int],
+private val windowEndPos: Option[Int])
+  extends IncrementalAggregateWindowFunction[TimeWindow](
+aggregates,
+groupKeysMapping,
+aggregateMapping, finalRowArity) {
+
+  private var collector: TimeWindowPropertyCollector = _
+
+  override def open(parameters: Configuration): Unit = {
+collector = new TimeWindowPropertyCollector(windowStartPos, 
windowEndPos)
+super.open(parameters)
+  }
+
+  override def apply(
+key: Tuple,
+window: TimeWindow,
+records: Iterable[Row],
+out: Collector[Row]): Unit = {
+
+// set collector and window
+collector.wrappedCollector = out
+collector.timeWindow = window
+
+super.apply(key,window,records,collector)
--- End diff --

please add spaces


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88659308
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
 ---
@@ -135,50 +128,123 @@ class DataStreamAggregate(
   namedProperties)
 
 val prepareOpName = s"prepare select: ($aggString)"
-val mappedInput = inputDS
-  .map(aggregateResult._1)
-  .name(prepareOpName)
-
-val groupReduceFunction = aggregateResult._2
-val rowTypeInfo = new RowTypeInfo(fieldTypes)
-
-val result = {
-  // grouped / keyed aggregation
-  if (groupingKeys.length > 0) {
-val aggOpName = s"groupBy: (${groupingToString(inputType, 
grouping)}), " +
-  s"window: ($window), " +
-  s"select: ($aggString)"
-val aggregateFunction =
-  createWindowAggregationFunction(window, namedProperties, 
groupReduceFunction)
-
-val keyedStream = mappedInput.keyBy(groupingKeys: _*)
-
-val windowedStream = createKeyedWindowedStream(window, keyedStream)
-  .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
-
-windowedStream
-  .apply(aggregateFunction)
-  .returns(rowTypeInfo)
-  .name(aggOpName)
-  .asInstanceOf[DataStream[Any]]
+val keyedAggOpName = s"groupBy: (${groupingToString(inputType, 
grouping)}), " +
+  s"window: ($window), " +
+  s"select: ($aggString)"
+val nonKeyedAggOpName = s"window: ($window), select: ($aggString)"
+
+val (aggFieldIndexes, aggregates) =
+  AggregateUtil.transformToAggregateFunctions(
+namedAggregates.map(_.getKey), inputType, grouping.length)
+
+val result: DataStream[Any] = {
+  // check whether all aggregates support partial aggregate
+  if (aggregates.forall(_.supportPartial)){
+// do Incremental Aggregation
+// add grouping fields, position keys in the input, and input type
+val (mapFunction,
+reduceFunction,
+groupingOffsetMapping,
+aggOffsetMapping,
+intermediateRowArity) = 
AggregateUtil.createOperatorFunctionsForIncrementalAggregates(
--- End diff --

This method could be split into three methods, one for the mapper, one for 
the reduce function, and one for the window function.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-18 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r88636193
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateTimeWindowFunction.scala
 ---
@@ -52,6 +52,6 @@ class AggregateTimeWindowFunction(
 collector.timeWindow = window
 
 // call wrapped reduce function with property collector
-groupReduceFunction.reduce(input, collector)
+super.apply(key,window,input,collector)
--- End diff --

Please add spaces between function arguements


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-13 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r87715204
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
 ---
@@ -194,9 +263,10 @@ class DataStreamAggregate(
   .name(mapName)
   case _ => result
 }
+result
--- End diff --

sure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-13 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r87714387
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/AggregationsITCase.scala
 ---
@@ -177,6 +177,58 @@ class AggregationsITCase extends 
StreamingMultipleProgramsTestBase {
   "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01,1970-01-01 
00:00:00.0")
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
+
+  @Test
+  def testProcessingTimeSlidingGroupWindowOverCountWithAVG(): Unit = {
--- End diff --

thanks @wuchong ,Strongly Agree!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-13 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r87693820
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
 ---
@@ -103,19 +103,12 @@ class DataStreamAggregate(
   }
 
   override def translateToPlan(
-  tableEnv: StreamTableEnvironment,
-  expectedType: Option[TypeInformation[Any]])
-: DataStream[Any] = {
-
+tableEnv: StreamTableEnvironment,
+expectedType: Option[TypeInformation[Any]])
+  : DataStream[Any] = {
--- End diff --

Indent.

Could you revert this change ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-13 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r87693848
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -61,25 +61,108 @@ object AggregateUtil {
* }}}
*
*/
-  def createOperatorFunctionsForAggregates(
+def createOperatorFunctionsForAggregates(
--- End diff --

indent  :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-13 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r87693902
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -61,25 +61,108 @@ object AggregateUtil {
* }}}
*
*/
-  def createOperatorFunctionsForAggregates(
+def createOperatorFunctionsForAggregates(
   namedAggregates: Seq[CalcitePair[AggregateCall, String]],
   inputType: RelDataType,
   outputType: RelDataType,
   groupings: Array[Int])
 : (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = {
 
-val aggregateFunctionsAndFieldIndexes =
-  transformToAggregateFunctions(namedAggregates.map(_.getKey), 
inputType, groupings.length)
-// store the aggregate fields of each aggregate function, by the same 
order of aggregates.
-val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1
-val aggregates = aggregateFunctionsAndFieldIndexes._2
+   val (aggFieldIndexes, aggregates)  =
--- End diff --

It would be better to restore the comment message above this code. 

```
// store the aggregate fields of each aggregate function, by the same 
order of aggregates.
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-13 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r87693746
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/AggregationsITCase.scala
 ---
@@ -177,6 +177,58 @@ class AggregationsITCase extends 
StreamingMultipleProgramsTestBase {
   "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01,1970-01-01 
00:00:00.0")
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
+
+  @Test
+  def testProcessingTimeSlidingGroupWindowOverCountWithAVG(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.testResults = mutable.MutableList()
+
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+val windowedTable = table
+  .groupBy('string)
+  .window(Slide over 4.rows every 2.rows)
+  .select('string, 'int.avg)
+
+val results = windowedTable.toDataStream[Row]
+results.addSink(new StreamITCase.StringSink)
+env.execute()
+
+val expected = Seq("Hello world,3","Hello,2")
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testEventTimeTumblingWindowWithAVG(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.testResults = mutable.MutableList()
+
+val stream = env
+  .fromCollection(data)
+  .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+val windowedTable = table
+  .groupBy('string)
+  .window(Tumble over 5.milli on 'rowtime as 'w)
+  .select('string, 'int.avg)
--- End diff --

Same as above. It could be combined with `testEventTimeTumblingWindow` , 
and reduce the test number. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-13 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r87693718
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/AggregationsITCase.scala
 ---
@@ -177,6 +177,58 @@ class AggregationsITCase extends 
StreamingMultipleProgramsTestBase {
   "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01,1970-01-01 
00:00:00.0")
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
+
+  @Test
+  def testProcessingTimeSlidingGroupWindowOverCountWithAVG(): Unit = {
--- End diff --

It is almost the same as `testProcessingTimeSlidingGroupWindowOverCount` 
except the average aggregate. I will suggest to move `'int.avg` into 
`testProcessingTimeSlidingGroupWindowOverCount`, and remove this test. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-13 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r87706978
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
 ---
@@ -135,50 +130,124 @@ class DataStreamAggregate(
   namedProperties)
 
 val prepareOpName = s"prepare select: ($aggString)"
-val mappedInput = inputDS
-  .map(aggregateResult._1)
-  .name(prepareOpName)
-
-val groupReduceFunction = aggregateResult._2
-val rowTypeInfo = new RowTypeInfo(fieldTypes)
 
-val result = {
-  // grouped / keyed aggregation
-  if (groupingKeys.length > 0) {
-val aggOpName = s"groupBy: (${groupingToString(inputType, 
grouping)}), " +
-  s"window: ($window), " +
-  s"select: ($aggString)"
-val aggregateFunction =
-  createWindowAggregationFunction(window, namedProperties, 
groupReduceFunction)
-
-val keyedStream = mappedInput.keyBy(groupingKeys: _*)
-
-val windowedStream = createKeyedWindowedStream(window, keyedStream)
-  .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
-
-windowedStream
-  .apply(aggregateFunction)
-  .returns(rowTypeInfo)
-  .name(aggOpName)
-  .asInstanceOf[DataStream[Any]]
+val (aggFieldIndexes, aggregates) =
+  AggregateUtil.transformToAggregateFunctions(
+namedAggregates.map(_.getKey), inputType, grouping.length)
+
+val result: DataStream[Any] = {
+
+  // check all aggregates are support Partial aggregate
+  if (aggregates.map(_.supportPartial).forall(x => x)) {
--- End diff --

simplified into

```scala
if (aggregates.forall(_.supportPartial)) {
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-13 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r87708469
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -61,25 +61,108 @@ object AggregateUtil {
* }}}
*
*/
-  def createOperatorFunctionsForAggregates(
+def createOperatorFunctionsForAggregates(
--- End diff --

It is a little confused that what's the difference between these two 
`createOperatorFunctionsForAggregates` .  

Could we remove the first one, and call `transformToAggregateFunctions` 
explicitly before `createOperatorFunctionsForAggregates` in DataSetAggregate ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-13 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r87707411
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
 ---
@@ -194,9 +263,10 @@ class DataStreamAggregate(
   .name(mapName)
   case _ => result
 }
+result
--- End diff --

The `result` can be omitted.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-13 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r87707098
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
 ---
@@ -135,50 +130,124 @@ class DataStreamAggregate(
   namedProperties)
 
 val prepareOpName = s"prepare select: ($aggString)"
-val mappedInput = inputDS
-  .map(aggregateResult._1)
-  .name(prepareOpName)
-
-val groupReduceFunction = aggregateResult._2
-val rowTypeInfo = new RowTypeInfo(fieldTypes)
 
-val result = {
-  // grouped / keyed aggregation
-  if (groupingKeys.length > 0) {
-val aggOpName = s"groupBy: (${groupingToString(inputType, 
grouping)}), " +
-  s"window: ($window), " +
-  s"select: ($aggString)"
-val aggregateFunction =
-  createWindowAggregationFunction(window, namedProperties, 
groupReduceFunction)
-
-val keyedStream = mappedInput.keyBy(groupingKeys: _*)
-
-val windowedStream = createKeyedWindowedStream(window, keyedStream)
-  .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
-
-windowedStream
-  .apply(aggregateFunction)
-  .returns(rowTypeInfo)
-  .name(aggOpName)
-  .asInstanceOf[DataStream[Any]]
+val (aggFieldIndexes, aggregates) =
+  AggregateUtil.transformToAggregateFunctions(
+namedAggregates.map(_.getKey), inputType, grouping.length)
+
+val result: DataStream[Any] = {
+
+  // check all aggregates are support Partial aggregate
+  if (aggregates.map(_.supportPartial).forall(x => x)) {
--- End diff --

There is too much code in `translateToPlan` method. I would like to split 
the incremental-aggregation and non-incremental-aggregation into separate 
private methods.

That would be more readable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-13 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r87707013
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
 ---
@@ -135,50 +130,124 @@ class DataStreamAggregate(
   namedProperties)
 
 val prepareOpName = s"prepare select: ($aggString)"
-val mappedInput = inputDS
-  .map(aggregateResult._1)
-  .name(prepareOpName)
-
-val groupReduceFunction = aggregateResult._2
-val rowTypeInfo = new RowTypeInfo(fieldTypes)
 
-val result = {
-  // grouped / keyed aggregation
-  if (groupingKeys.length > 0) {
-val aggOpName = s"groupBy: (${groupingToString(inputType, 
grouping)}), " +
-  s"window: ($window), " +
-  s"select: ($aggString)"
-val aggregateFunction =
-  createWindowAggregationFunction(window, namedProperties, 
groupReduceFunction)
-
-val keyedStream = mappedInput.keyBy(groupingKeys: _*)
-
-val windowedStream = createKeyedWindowedStream(window, keyedStream)
-  .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
-
-windowedStream
-  .apply(aggregateFunction)
-  .returns(rowTypeInfo)
-  .name(aggOpName)
-  .asInstanceOf[DataStream[Any]]
+val (aggFieldIndexes, aggregates) =
+  AggregateUtil.transformToAggregateFunctions(
+namedAggregates.map(_.getKey), inputType, grouping.length)
+
+val result: DataStream[Any] = {
+
+  // check all aggregates are support Partial aggregate
--- End diff --

minor, `are support Partial aggregate` => `support partial aggregate`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2792: [FLINK-4937] [Table] Add incremental group window ...

2016-11-13 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/2792#discussion_r87708645
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -61,25 +61,108 @@ object AggregateUtil {
* }}}
*
*/
-  def createOperatorFunctionsForAggregates(
+def createOperatorFunctionsForAggregates(
   namedAggregates: Seq[CalcitePair[AggregateCall, String]],
   inputType: RelDataType,
   outputType: RelDataType,
   groupings: Array[Int])
 : (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row]) = {
 
-val aggregateFunctionsAndFieldIndexes =
-  transformToAggregateFunctions(namedAggregates.map(_.getKey), 
inputType, groupings.length)
-// store the aggregate fields of each aggregate function, by the same 
order of aggregates.
-val aggFieldIndexes = aggregateFunctionsAndFieldIndexes._1
-val aggregates = aggregateFunctionsAndFieldIndexes._2
+   val (aggFieldIndexes, aggregates)  =
+   transformToAggregateFunctions(namedAggregates.map(_.getKey),
+ inputType, groupings.length)
 
-val mapReturnType: RowTypeInfo =
-  createAggregateBufferDataType(groupings, aggregates, inputType)
+createOperatorFunctionsForAggregates(namedAggregates,
+  inputType,
+  outputType,
+  groupings,
+  aggregates,aggFieldIndexes)
+}
 
-val mapFunction = new AggregateMapFunction[Row, Row](
-aggregates, aggFieldIndexes, groupings,
-
mapReturnType.asInstanceOf[RowTypeInfo]).asInstanceOf[MapFunction[Any, Row]]
+def createOperatorFunctionsForAggregates(
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType,
+groupings: Array[Int],
+aggregates:Array[Aggregate[_ <: Any]],
+aggFieldIndexes:Array[Int])
+: (MapFunction[Any, Row], RichGroupReduceFunction[Row, Row])= {
+
+  val mapFunction = createAggregateMapFunction(aggregates,
+aggFieldIndexes, groupings, inputType)
+
+  // the mapping relation between field index of intermediate 
aggregate Row and output Row.
+  val groupingOffsetMapping = getGroupKeysMapping(inputType, 
outputType, groupings)
+
+  // the mapping relation between aggregate function index in list and 
its corresponding
+  // field index in output Row.
+  val aggOffsetMapping = getAggregateMapping(namedAggregates, 
outputType)
+
+  if (groupingOffsetMapping.length != groupings.length ||
+aggOffsetMapping.length != namedAggregates.length) {
+throw new TableException("Could not find output field in input 
data type " +
+  "or aggregate functions.")
+  }
+
+  val allPartialAggregate = aggregates.map(_.supportPartial).forall(x 
=> x)
+
+  val intermediateRowArity = groupings.length +
+aggregates.map(_.intermediateDataType.length).sum
+
+  val reduceGroupFunction =
+if (allPartialAggregate) {
+  new AggregateReduceCombineFunction(
+aggregates,
+groupingOffsetMapping,
+aggOffsetMapping,
+intermediateRowArity,
+outputType.getFieldCount)
+}
+else {
+  new AggregateReduceGroupFunction(
+aggregates,
+groupingOffsetMapping,
+aggOffsetMapping,
+intermediateRowArity,
+outputType.getFieldCount)
+}
+
+  (mapFunction, reduceGroupFunction)
+  }
+
+  /**
+* Create Flink operator functions for Incremental aggregates.
+* It includes 2 implementations of Flink operator functions:
+* [[org.apache.flink.api.common.functions.MapFunction]] and
+* [[org.apache.flink.api.common.functions.ReduceFunction]]
+* The output of [[org.apache.flink.api.common.functions.MapFunction]] 
contains the
+* intermediate aggregate values of all aggregate function, it's stored 
in Row by the following
+* format:
+*
+* {{{
+*   avg(x) aggOffsetInRow = 2  count(z) 
aggOffsetInRow = 5
+* |  |
+* v  v
+*+-+-+++++
+*|groupKey1|groupKey2|  sum1  | count1 |  sum2  | count2 |
+*+-+-+++++
+*