[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984654#comment-15984654
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user hequn8128 closed the pull request at:

https://github.com/apache/flink/pull/3733


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984638#comment-15984638
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3733
  
Hi @hequn8128, I merged the PR to `table-retraction`. Can you close it?

Thanks, Fabian


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984524#comment-15984524
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3733
  
Thanks for the update @hequn8128. The PR looks really good, IMO.
Will do a couple of minor refactorings and merge it to `table-retraction` 
branch.

Best, Fabian


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984308#comment-15984308
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/3733
  
hi @fhueske , thanks a lot for your review and help. I have addressed all 
your comments and updated the PR. All changes have been checked before my 
latest update. 
Thanks,  Hequn


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981878#comment-15981878
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r113049726
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
 ---
@@ -47,12 +48,12 @@ class ProcTimeBoundedRangeOver(
 genAggregations: GeneratedAggregationsFunction,
 precedingTimeBoundary: Long,
 aggregatesTypeInfo: RowTypeInfo,
-inputType: TypeInformation[Row])
-  extends ProcessFunction[Row, Row]
+inputType: TypeInformation[CRow])
+  extends ProcessFunction[CRow, CRow]
 with Compiler[GeneratedAggregations] {
-  private var output: Row = _
+  private var output: CRow = _
   private var accumulatorState: ValueState[Row] = _
-  private var rowMapState: MapState[Long, JList[Row]] = _
--- End diff --

I think we should keep the state as `Row`. At the moment we do not need the 
`command` field.


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981891#comment-15981891
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r113050568
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala
 ---
@@ -48,15 +49,15 @@ class ProcTimeBoundedRowsOver(
 genAggregations: GeneratedAggregationsFunction,
 precedingOffset: Long,
 aggregatesTypeInfo: RowTypeInfo,
-inputType: TypeInformation[Row])
-  extends ProcessFunction[Row, Row]
+inputType: TypeInformation[CRow])
+  extends ProcessFunction[CRow, CRow]
 with Compiler[GeneratedAggregations] {
 
   Preconditions.checkArgument(precedingOffset > 0)
 
   private var accumulatorState: ValueState[Row] = _
-  private var rowMapState: MapState[Long, JList[Row]] = _
-  private var output: Row = _
+  private var rowMapState: MapState[Long, JList[CRow]] = _
--- End diff --

Same as for the other over window. Let's use `Row` for the state and 
extract the `Row` from the `CRow`.
`output` should be a `CRow`.


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981873#comment-15981873
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112901608
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -723,11 +738,11 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 
 val genFunction = generator.generateFunction(
   functionName,
-  classOf[MapFunction[Row, T]],
+  classOf[MapFunction[P, T]],
--- End diff --

This means we need a dedicated `MapRunner` (and other runners) for 
streaming, but IMO, that's an easier change than touching the code-gen of the 
whole Table API.


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981887#comment-15981887
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r113048507
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -159,18 +174,22 @@ object AggregateUtil {
 * @param generator   code generator instance
 * @param namedAggregates List of calls to aggregate functions and 
their output field names
 * @param inputType   Input row type
+* @param inputTypeInfo   Input DataStream row type
+* @param returnTypeInfo  Return DataStream row type
 * @param precedingOffset the preceding offset
-* @param isRowsClause   It is a tag that indicates whether the OVER 
clause is ROWS clause
+* @param isRowsClauseIt is a tag that indicates whether the OVER 
clause is ROWS clause
 * @param isRowTimeType   It is a tag that indicates whether the time 
type is rowTimeType
 * @return [[org.apache.flink.streaming.api.functions.ProcessFunction]]
 */
-  private[flink] def createBoundedOverProcessFunction(
+  private[flink] def createBoundedOverProcessFunction[T](
 generator: CodeGenerator,
 namedAggregates: Seq[CalcitePair[AggregateCall, String]],
 inputType: RelDataType,
+inputTypeInfo: TypeInformation[T],
--- End diff --

remove new type info parameters.


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981880#comment-15981880
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r113050047
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
 ---
@@ -93,7 +94,7 @@ class ProcTimeBoundedRangeOver(
 var rowList = rowMapState.get(currentTime)
 // null value means that this si the first event received for this 
timestamp
 if (rowList == null) {
-  rowList = new ArrayList[Row]()
+  rowList = new ArrayList[CRow]()
--- End diff --

Keep `Row`


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981885#comment-15981885
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r113046533
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
 ---
@@ -92,11 +92,13 @@ class DataStreamCalc(
 
 val genFunction = generator.generateFunction(
--- End diff --

If we generate a `FlatMapFunction[Row, Row]` we can use a 
`RetractFlatMapRunner` class to unwrap the `Row` from the `CRow` before calling 
the generated code.


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981872#comment-15981872
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r113043191
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
 ---
@@ -233,6 +234,9 @@ object CodeGenUtils {
   case ri: RowTypeInfo =>
 ProductAccessor(index)
 
+  case cri: CRowTypeInfo =>
--- End diff --

Revert these changes


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981893#comment-15981893
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r113051216
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala
 ---
@@ -19,34 +19,44 @@
 package org.apache.flink.table.runtime.aggregate
 
 import org.apache.calcite.runtime.SqlFunctions
+import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.types.Row
 import org.apache.flink.util.Collector
 
 /**
   * Adds TimeWindow properties to specified fields of a row before it 
emits the row to a wrapped
   * collector.
   */
-class TimeWindowPropertyCollector(windowStartOffset: Option[Int], 
windowEndOffset: Option[Int])
-extends Collector[Row] {
+class TimeWindowPropertyCollector[T](windowStartOffset: Option[Int], 
windowEndOffset: Option[Int])
--- End diff --

we can make this an `abstract` class and add to subclasses 
`RowTimeWindowPropertyCollector` and `CRowTimeWindowPropertyCollector`.
That would avoid the `if` condition to identify the input type.


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981884#comment-15981884
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r113048689
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -179,7 +198,9 @@ object AggregateUtil {
 needRetraction = true)
 
 val aggregationStateType: RowTypeInfo = 
createAccumulatorRowType(aggregates)
-val inputRowType = 
FlinkTypeFactory.toInternalRowTypeInfo(inputType).asInstanceOf[RowTypeInfo]
+val inputRowType = FlinkTypeFactory
--- End diff --

change to `CRowTypeInfo(FlinkTypeFactory.toInternalRowType(inputType)))`


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981877#comment-15981877
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r113046948
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
 ---
@@ -22,17 +22,16 @@ import org.apache.calcite.rel.RelNode
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.StreamTableEnvironment
 import org.apache.flink.table.plan.nodes.FlinkRel
-import org.apache.flink.types.Row
 
-trait DataStreamRel extends RelNode with FlinkRel {
+trait DataStreamRel[T] extends RelNode with FlinkRel {
--- End diff --

I don't think we need to parameterize `DataStreamRel`. All `DataStreamRel` 
nodes should return `CRow` in my opinion.


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981895#comment-15981895
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r113048214
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -102,13 +110,17 @@ object AggregateUtil {
 new RowTimeUnboundedRowsOver(
   genFunction,
   aggregationStateType,
-  FlinkTypeFactory.toInternalRowTypeInfo(inputType))
+  FlinkTypeFactory
+.toInternalRowTypeInfo(inputType, classOf[CRow])
+.asInstanceOf[CRowTypeInfo])
   } else {
 // RANGE unbounded over process function
 new RowTimeUnboundedRangeOver(
   genFunction,
   aggregationStateType,
-  FlinkTypeFactory.toInternalRowTypeInfo(inputType))
+  FlinkTypeFactory
--- End diff --

change to `CRowTypeInfo(FlinkTypeFactory.toInternalRowType(inputType)))`


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981870#comment-15981870
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r113044668
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
 ---
@@ -23,26 +23,28 @@ import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rex._
 import org.apache.flink.api.common.functions.{FlatMapFunction, 
RichFlatMapFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.{CodeGenerator, GeneratedFunction}
 import org.apache.flink.table.runtime.FlatMapRunner
+import org.apache.flink.table.runtime.types.CRowTypeInfo
 import org.apache.flink.types.Row
 
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 
 trait CommonCalc {
 
-  private[flink] def functionBody(
+  private[flink] def functionBody[T](
--- End diff --

I would follow a similar strategy as with the `TableEnvironment` sink 
conversion for the `CommonX` classes.
The `CommonX` class has a method to generate the code for a function that 
operates on `Row`. Each class that extends `CommonX` has a dedicated method to 
instantiate the wrapper class (e..g, `MapRunner`, `FlatMapRunner`. ). The 
runner classes use the generated function to operate on Rows. For streaming 
operators, the runner classes unwrap the `Row` from `CRow` before calling the 
generated code.


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981889#comment-15981889
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r113050816
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
 ---
@@ -82,9 +83,9 @@ class ProcTimeBoundedRangeOver(
   }
 
   override def processElement(
-input: Row,
-ctx: ProcessFunction[Row, Row]#Context,
-out: Collector[Row]): Unit = {
+input: CRow,
+ctx: ProcessFunction[CRow, CRow]#Context,
+out: Collector[CRow]): Unit = {
 
 val currentTime = ctx.timerService.currentProcessingTime
--- End diff --

we can set `val row = input.row` and add `row` instead of `input` to the 
state.


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981883#comment-15981883
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r113050330
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
 ---
@@ -69,10 +70,10 @@ class ProcTimeBoundedRangeOver(
 output = function.createOutputRow()
--- End diff --

When the code-gen'd functions need as `Row` to set the result. We can 
simply pass `output.row` then and later emit `output`.


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981886#comment-15981886
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r113049237
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
 ---
@@ -31,35 +32,35 @@ abstract class GeneratedAggregations extends Function {
 *
 * @param accumulators the accumulators (saved in a row) which contains 
the current
 * aggregated results
-* @param output   output results collected in a row
+* @param output   output results collected in a command row
 */
-  def setAggregationResults(accumulators: Row, output: Row)
+  def setAggregationResults(accumulators: Row, output: CRow)
--- End diff --

I would change all `CRow` types back to `Row`. Let's leave the code 
generation as it is and extract the rows in the wrapper classes.


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981881#comment-15981881
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r113043582
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -255,17 +256,23 @@ class CodeGenerator(
 *
 * @return A GeneratedAggregationsFunction
 */
-  def generateAggregations(
+  def generateAggregations[T](
--- End diff --

revert all changes in `CodeGenerator`. The wrapping functions (such as 
`MapRunner`) can extract the `Row` from a `CRow` and call the generated 
function with that object.


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981894#comment-15981894
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r113051486
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRow.scala
 ---
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.types
+
+import org.apache.flink.types.Row
+
+/**
+  * Wrapper for a [[Row]] to add retraction information.
+  *
+  * If [[change]] is true, the [[CRow]] is an accumulate message, if it is 
false it is a
+  * retraction message.
+  *
+  * @param row The wrapped [[Row]].
+  * @param change true for an accumulate message, false for a retraction 
message.
+  */
+class CRow(var row: Row, var change: Boolean) {
+
+  override def toString: String = s"${if(change) "+" else "-"}$row"
+
+  override def equals(other: scala.Any): Boolean = {
+val otherCRow = other.asInstanceOf[CRow]
+row.equals(otherCRow.row) && change == otherCRow.change
+  }
+}
+
+class XRow(arity: Int, var change: Boolean) extends Row(arity) {
--- End diff --

Can this be removed?


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981874#comment-15981874
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r113045326
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala
 ---
@@ -45,7 +45,7 @@ class DataStreamCalc(
 ruleDescription: String)
   extends Calc(cluster, traitSet, input, calcProgram)
   with CommonCalc
-  with DataStreamRel {
+  with DataStreamRel[CRow] {
--- End diff --

`DataStreamRel` does not need a type parameter, IMO. All `DataStreamRel` 
nodes should produce `CRow`.


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981892#comment-15981892
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r113049930
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
 ---
@@ -69,10 +70,10 @@ class ProcTimeBoundedRangeOver(
 output = function.createOutputRow()
--- End diff --

we can init `output` as `output = new CRow(function.createOutputRow(), 
true)`


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981879#comment-15981879
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r113048361
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -118,7 +130,7 @@ object AggregateUtil {
   } else {
 new ProcTimeUnboundedNonPartitionedOver(
   genFunction,
-  aggregationStateType)
+  new CRowTypeInfo(aggregationStateType))
--- End diff --

we can keep `aggregationStateType` if we don't touch the code generation


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981882#comment-15981882
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r113048128
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -102,13 +110,17 @@ object AggregateUtil {
 new RowTimeUnboundedRowsOver(
   genFunction,
   aggregationStateType,
-  FlinkTypeFactory.toInternalRowTypeInfo(inputType))
+  FlinkTypeFactory
--- End diff --

change to `CRowTypeInfo(FlinkTypeFactory.toInternalRowType(inputType)))`


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981890#comment-15981890
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r113049009
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala
 ---
@@ -66,7 +66,7 @@ class DataSetSessionWindowAggReduceGroupFunction(
 
   private var aggregateBuffer: Row = _
   private var output: Row = _
-  private var collector: TimeWindowPropertyCollector = _
+  private var collector: TimeWindowPropertyCollector[Row] = _
--- End diff --

We can make `TimeWindowPropertyCollector` abstract and implement two 
versions, one for `Row` and one for `CRow`


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981888#comment-15981888
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r113047960
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -61,23 +62,28 @@ object AggregateUtil {
 * @param generator   code generator instance
 * @param namedAggregates List of calls to aggregate functions and 
their output field names
 * @param inputType Input row type
+* @param inputTypeInfo Input DataStream row type
+* @param returnTypeInfo Return DataStream row type
 * @param isRowTimeType It is a tag that indicates whether the time 
type is rowTimeType
 * @param isPartitioned It is a tag that indicate whether the input is 
partitioned
 * @param isRowsClause It is a tag that indicates whether the OVER 
clause is ROWS clause
 */
-  private[flink] def createUnboundedOverProcessFunction(
+  private[flink] def createUnboundedOverProcessFunction[T](
 generator: CodeGenerator,
 namedAggregates: Seq[CalcitePair[AggregateCall, String]],
 inputType: RelDataType,
+inputTypeInfo: TypeInformation[T],
--- End diff --

We don't need these types if we keep the code generation as it is.


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981869#comment-15981869
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112901968
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 ---
@@ -175,14 +176,19 @@ object FlinkTypeFactory {
   /**
 * Converts a Calcite logical record into a Flink type information.
 */
-  def toInternalRowTypeInfo(logicalRowType: RelDataType): 
TypeInformation[Row] = {
+  def toInternalRowTypeInfo[T](logicalRowType: RelDataType, resultClass: 
Class[T])
--- End diff --

I would keep this method as it is. 
Whenever we need a `CRow` we can easily create it as 
`CRowTypeInfo(rowType)`.


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981867#comment-15981867
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r113044998
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala
 ---
@@ -38,7 +39,9 @@ trait BatchScan extends CommonScan with DataSetRel {
 
 val inputType = input.getType
 
-val internalType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType)
--- End diff --

These changes can be reverted if we leave the 
`FlinkTypeFactory.toInternalRowTypeInfo()` method as it is.


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981871#comment-15981871
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r113043078
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -627,24 +640,27 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 * @param functionName name of the map function. Must not be unique but 
has to be a
 * valid Java class identifier.
 */
-  protected def sinkConversion[T](
-  physicalRowTypeInfo: TypeInformation[Row],
+  protected def sinkConversion[T, P](
--- End diff --

I would refactor this as follows:
- make `sinkConversion()` an abstract method that is implemented by 
`BatchTableEnvironemnt` and `StreamTableEnvironment`
- extract the code-generation part that converts a `Row` into the requested 
data type and keep this in a method 
`TableEnvironment.generateRowConverterFunction()`
- the implementations of `sinkConversion()` can use the 
`generateRowConverterFunction`


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981875#comment-15981875
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112902344
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowTypeInfo.scala
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.types
+
+import java.util
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.{CompositeType, 
TypeComparator, TypeSerializer}
+import 
org.apache.flink.api.common.typeutils.CompositeType.{FlatFieldDescriptor, 
TypeComparatorBuilder}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+
+class CRowTypeInfo(val rowType: RowTypeInfo) extends 
CompositeType[CRow](classOf[CRow]) {
+
+  override def getFieldNames: Array[String] = rowType.getFieldNames
+
+  override def getFieldIndex(fieldName: String): Int = 
rowType.getFieldIndex(fieldName)
+
+  override def getTypeAt[X](fieldExpression: String): TypeInformation[X] =
+rowType.getTypeAt(fieldExpression)
+
+  override def getTypeAt[X](pos: Int): TypeInformation[X] =
+rowType.getTypeAt(pos)
+
+  override def getFlatFields(
+  fieldExpression: String,
+  offset: Int,
+  result: util.List[FlatFieldDescriptor]): Unit =
+rowType.getFlatFields(fieldExpression, offset, result)
+
+  override def isBasicType: Boolean = rowType.isBasicType
+
+  override def isTupleType: Boolean = rowType.isTupleType
+
+  override def getArity: Int = rowType.getArity
+
+  override def getTotalFields: Int = rowType.getTotalFields
+
+  override def createSerializer(config: ExecutionConfig): 
TypeSerializer[CRow] =
+new CRowSerializer(rowType.createSerializer(config))
+
+  // not implemented because we override createComparator
+  override protected def createTypeComparatorBuilder(): 
TypeComparatorBuilder[CRow] = null
+
+  override def createComparator(
+  logicalKeyFields: Array[Int],
+  orders: Array[Boolean],
+  logicalFieldOffset: Int,
+  config: ExecutionConfig): TypeComparator[CRow] = {
+
+val rowComparator = rowType.createComparator(
+  logicalKeyFields,
+  orders,
+  logicalFieldOffset,
+  config)
+
+new CRowComparator(rowComparator)
+  }
+
+  override def equals(obj: scala.Any): Boolean = {
+if (this.canEqual(obj)) {
+  rowType.equals(obj.asInstanceOf[CRowTypeInfo].rowType)
+} else {
+  false
+}
+  }
+
+  override def canEqual(obj: scala.Any): Boolean = 
obj.isInstanceOf[CRowTypeInfo]
+}
+
--- End diff --

Add 

```
object CRowTypeInfo {
  
  def apply(rowType: TypeInformation[Row]): CRowTypeInfo = {
rowType match {
  case r: RowTypeInfo => new CRowTypeInfo(r)
}
  }
  
}
```

for easy creation of `CRowTypeInfo`.


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in 

[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981876#comment-15981876
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r113047487
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala
 ---
@@ -46,12 +47,12 @@ class AggregateAggFunction(
 accumulatorRow
   }
 
-  override def add(value: Row, accumulatorRow: Row): Unit = {
+  override def add(value: CRow, accumulatorRow: Row): Unit = {
 
--- End diff --

add here `val row = value.row` and use `row` to aggregate the values.


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981868#comment-15981868
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112901336
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
@@ -723,11 +738,11 @@ abstract class TableEnvironment(val config: 
TableConfig) {
 
 val genFunction = generator.generateFunction(
   functionName,
-  classOf[MapFunction[Row, T]],
+  classOf[MapFunction[P, T]],
--- End diff --

I think we should not touch the code generator. Instead, we unwrap the 
`Row` from the `CRow` before handing it to the code-gen'd function. 
I'll add more comments on this below.


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981087#comment-15981087
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/3733
  
Hi @fhueske , the latest commit is a bugfix. It mainly includes the 
following contents:

1. request type add `CRow` support in `TableEnvironment`,so we can sink 
with `CRow`
2. `getFieldInfo` add `CRow` support in `TableEnvironment`,so we can init a 
`Table` from a `DataStream` of `CRow`
3. fix type erasure problem in `TableEnvironment` and 
`DataStreamRetractionRules`

Thanks, Hequn


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15980122#comment-15980122
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112818000
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
 ---
@@ -84,16 +109,38 @@ class GroupAggProcessFunction(
 }
 
 // Set aggregate result to the final output
-i = 0
-while (i < aggregates.length) {
-  val index = groupings.length + i
-  val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
-  aggregates(i).accumulate(accumulator, 
input.getField(aggFields(i)(0)))
-  output.setField(index, aggregates(i).getValue(accumulator))
-  i += 1
+if (input.command == Command.Delete) {
+  i = 0
+  while (i < aggregates.length) {
+val index = groupings.length + i
+val accumulator = 
accumulators.getField(i).asInstanceOf[Accumulator]
+aggregates(i).retract(accumulator, input.getField(aggFields(i)(0)))
+output.setField(index, aggregates(i).getValue(accumulator))
+i += 1
+  }
+} else {
+  i = 0
+  while (i < aggregates.length) {
+val index = groupings.length + i
+val accumulator = 
accumulators.getField(i).asInstanceOf[Accumulator]
+aggregates(i).accumulate(accumulator, 
input.getField(aggFields(i)(0)))
+output.setField(index, aggregates(i).getValue(accumulator))
+i += 1
+  }
 }
-state.update(accumulators)
 
+// if previous is not null, do retraction process
+if (null != previous) {
+  if (previous.equals(output)) {
+// ignore same output
+return
--- End diff --

yes, you are right, we should update the state even if we do not emit new 
values. thx


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15980123#comment-15980123
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112818005
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala
 ---
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream
+
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, 
StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit.Test
+import org.apache.flink.table.api.scala._
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.table.utils.TableFunc0
+
+import scala.collection.mutable
+
+/**
+  * tests for retraction
+  */
+class RetractionITCase extends StreamingWithStateTestBase {
+  // input data
+  val data = List(
+("Hello", 1),
+("word", 1),
+("Hello", 1),
+("bark", 1)
+  )
+
+  // keyed groupby + keyed groupby
+  @Test
+  def testWordCount(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.testResults = mutable.MutableList()
+env.setParallelism(1)
+env.setStateBackend(getStateBackend)
+
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'word, 'num)
+val resultTable = table
+  .groupBy('word)
+  .select('word as 'word, 'num.sum as 'count)
+  .groupBy('count)
+  .select('count, 'word.count as 'frequency)
+
+val results = resultTable.toDataStream[Row]
+results.addSink(new StreamITCase.StringSink)
+env.execute()
+
+val expected = Seq("1,1", "1,2", "1,1", "2,1", "1,2")
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  // keyed groupby + non-keyed groupby
+  @Test
+  def testGroupByAndNonKeyedGroupBy(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.testResults = mutable.MutableList()
+env.setParallelism(1)
+env.setStateBackend(getStateBackend)
+
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'word, 'num)
+val resultTable = table
+  .groupBy('word)
+  .select('word as 'word, 'num.sum as 'count)
+  .select('count.sum)
+
+val results = resultTable.toDataStream[Row]
+results.addSink(new StreamITCase.StringSink)
+env.execute()
+
+val expected = Seq("1", "2", "1", "3", "4")
--- End diff --

hi, there is a `.select('count.sum)` after `groupBy('word).select('word as 
'word, 'num.sum as 'count)` 


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate 

[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15980126#comment-15980126
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112817984
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
 ---
@@ -68,12 +70,35 @@ class GroupAggProcessFunction(
 var accumulators = state.value()
 
 if (null == accumulators) {
+  previous = null
   accumulators = new Row(aggregates.length)
   i = 0
   while (i < aggregates.length) {
 accumulators.setField(i, aggregates(i).createAccumulator())
 i += 1
   }
+} else {
+  // get previous row
+  if (generateRetraction) {
+if (null == previous) {
+  previous = new Row(groupings.length + aggregates.length)
--- End diff --

done


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15980128#comment-15980128
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112817910
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala
 ---
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream
+
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, 
StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit.Test
+import org.apache.flink.table.api.scala._
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.table.utils.TableFunc0
+
+import scala.collection.mutable
+
+/**
+  * tests for retraction
+  */
+class RetractionITCase extends StreamingWithStateTestBase {
+  // input data
+  val data = List(
+("Hello", 1),
+("word", 1),
+("Hello", 1),
+("bark", 1)
+  )
+
+  // keyed groupby + keyed groupby
+  @Test
+  def testWordCount(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.testResults = mutable.MutableList()
+env.setParallelism(1)
+env.setStateBackend(getStateBackend)
+
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'word, 'num)
+val resultTable = table
+  .groupBy('word)
+  .select('word as 'word, 'num.sum as 'count)
+  .groupBy('count)
+  .select('count, 'word.count as 'frequency)
+
+val results = resultTable.toDataStream[Row]
+results.addSink(new StreamITCase.StringSink)
+env.execute()
+
+val expected = Seq("1,1", "1,2", "1,1", "2,1", "1,2")
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  // keyed groupby + non-keyed groupby
+  @Test
+  def testGroupByAndNonKeyedGroupBy(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.testResults = mutable.MutableList()
+env.setParallelism(1)
+env.setStateBackend(getStateBackend)
+
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'word, 'num)
+val resultTable = table
+  .groupBy('word)
+  .select('word as 'word, 'num.sum as 'count)
+  .select('count.sum)
+
+val results = resultTable.toDataStream[Row]
+results.addSink(new StreamITCase.StringSink)
+env.execute()
+
+val expected = Seq("1", "2", "1", "3", "4")
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  // non-keyed groupby + keyed groupby
+  @Test
+  def testNonKeyedGroupByAndGroupBy(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.testResults = mutable.MutableList()
+env.setParallelism(1)
+env.setStateBackend(getStateBackend)
+
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'word, 'num)
+val resultTable = table
+  .select('num.sum as 'count)
+  .groupBy('count)
+  .select('count, 'count.count)
+
+val results = resultTable.toDataStream[Row]
+results.addSink(new StreamITCase.StringSink)
+env.execute()
+
+

[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15980125#comment-15980125
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112817968
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -129,11 +131,17 @@ class DataStreamOverAggregate(
 generator,
 inputDS,
 isRowTimeType = false,
-isRowsClause = overWindow.isRows)
+isRowsClause = overWindow.isRows,
+consumeRetraction)
--- End diff --

hi, you are right, maybe it's better for us not to support OVER at current.


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15980127#comment-15980127
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112817995
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
 ---
@@ -84,16 +109,38 @@ class GroupAggProcessFunction(
 }
 
 // Set aggregate result to the final output
-i = 0
-while (i < aggregates.length) {
-  val index = groupings.length + i
-  val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
-  aggregates(i).accumulate(accumulator, 
input.getField(aggFields(i)(0)))
-  output.setField(index, aggregates(i).getValue(accumulator))
-  i += 1
+if (input.command == Command.Delete) {
+  i = 0
+  while (i < aggregates.length) {
+val index = groupings.length + i
+val accumulator = 
accumulators.getField(i).asInstanceOf[Accumulator]
+aggregates(i).retract(accumulator, input.getField(aggFields(i)(0)))
+output.setField(index, aggregates(i).getValue(accumulator))
+i += 1
+  }
+} else {
+  i = 0
+  while (i < aggregates.length) {
+val index = groupings.length + i
+val accumulator = 
accumulators.getField(i).asInstanceOf[Accumulator]
+aggregates(i).accumulate(accumulator, 
input.getField(aggFields(i)(0)))
+output.setField(index, aggregates(i).getValue(accumulator))
+i += 1
+  }
 }
-state.update(accumulators)
 
+// if previous is not null, do retraction process
+if (null != previous) {
+  if (previous.equals(output)) {
+// ignore same output
+return
--- End diff --

hi, we need not to output a same row, think about batch, only one result 
will be output, is it right?


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15980124#comment-15980124
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user hequn8128 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112817965
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
 ---
@@ -107,6 +107,14 @@ class DataStreamGroupWindowAggregate(
 val groupingKeys = grouping.indices.toArray
 val inputDS = 
input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
 
+val consumeRetraction = DataStreamRetractionRules.isAccRetract(input)
+
+if (consumeRetraction) {
+  throw new TableException(
+"Retraction on group window is not supported yet. Note: Currently, 
group window should " +
+  "not follow an unbounded groupby.")
--- End diff --

yes, non-windowed is more clear


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973662#comment-15973662
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112080186
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
 ---
@@ -84,16 +109,38 @@ class GroupAggProcessFunction(
 }
 
 // Set aggregate result to the final output
-i = 0
-while (i < aggregates.length) {
-  val index = groupings.length + i
-  val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
-  aggregates(i).accumulate(accumulator, 
input.getField(aggFields(i)(0)))
-  output.setField(index, aggregates(i).getValue(accumulator))
-  i += 1
+if (input.command == Command.Delete) {
+  i = 0
+  while (i < aggregates.length) {
+val index = groupings.length + i
+val accumulator = 
accumulators.getField(i).asInstanceOf[Accumulator]
+aggregates(i).retract(accumulator, input.getField(aggFields(i)(0)))
+output.setField(index, aggregates(i).getValue(accumulator))
+i += 1
+  }
+} else {
+  i = 0
+  while (i < aggregates.length) {
+val index = groupings.length + i
+val accumulator = 
accumulators.getField(i).asInstanceOf[Accumulator]
+aggregates(i).accumulate(accumulator, 
input.getField(aggFields(i)(0)))
+output.setField(index, aggregates(i).getValue(accumulator))
+i += 1
+  }
 }
-state.update(accumulators)
 
+// if previous is not null, do retraction process
+if (null != previous) {
+  if (previous.equals(output)) {
+// ignore same output
+return
--- End diff --

We still need to update the state even if we do not emit new values. If we 
have a max aggregation with an accumulator that holds a map of `10->1, 5->2` 
and we add `8`. The new accumulator will be `10->1, 8->1, 5->2` but the 
aggregation result will still be 10. If we later retract `10`, the new max 
would be `5` but should be `8`.


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973658#comment-15973658
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112065233
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
 ---
@@ -152,6 +161,7 @@ public void serialize(Row record, DataOutputView 
target) throws IOException {
fieldSerializers[i].serialize(o, target);
}
}
+   commandSerializer.serialize(record.command, target);
--- End diff --

Also by adding the command to `Row` we add serialization overhead to all 
jobs that use Row (including batch Table API / SQL).


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973656#comment-15973656
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112065725
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -129,11 +131,17 @@ class DataStreamOverAggregate(
 generator,
 inputDS,
 isRowTimeType = false,
-isRowsClause = overWindow.isRows)
+isRowsClause = overWindow.isRows,
+consumeRetraction)
--- End diff --

I think it would be better to enable retraction for all types of OVER 
aggregates at the same time. 
Just supporting one specific type adds more confusion than it helps, in my 
opinion.


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973659#comment-15973659
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112060500
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
 ---
@@ -152,6 +161,7 @@ public void serialize(Row record, DataOutputView 
target) throws IOException {
fieldSerializers[i].serialize(o, target);
}
}
+   commandSerializer.serialize(record.command, target);
--- End diff --

I'm afraid we cannot change the serialization of `Row`. `Row` is a public 
class in `flink-core` and not an internal `flink-table` class. Hence, it is 
used at other places and might also be part of user applications. If we change 
the serialization, users might not be able to restore a job on 1.3 from a 
savepoint taken with 1.2. This restriction rules out to simply add a field to 
`Row` which would avoid major refactorings.

I see two options to add the command field to the data streams in 
`flink-table`

1. use a regular field in `Row`. This would mean that the physical layout 
of the `Row` is no longer the same as the logical layout, i.e., the one 
expected by Calcite. However, we will probably change this anyway for the 
upcoming changes related to the time indicators. For these, the physical layout 
will have fewer fields than the logical layout (we will remove time fields 
which are in the meta data of Flink's records or taken as processing time). By 
adding the command field, we would add a field which is not in the logical 
layout. The problem with this approach is that the command field would be at 
different positions in the Row (probably the last one). We could leverage the 
changes introduced by the time indicator changes (or the other way round). 
@twalthr is working on this. You can have a look at the current status here: 
https://github.com/twalthr/flink/tree/FLINK-5884
2. The other option is to wrap the rows in a custom data type similar to a 
`Tuple2[Row, Command]`. The data type could be names `Change` or `CRow` and 
would have its own `TypeInformation`, `TypeSerializer`, and `TypeComparator` 
which forward most calls to the type info, serializer, and comparator of `Row`. 
The problem with this approach is that we need to change the return types of 
all functions. For some functions this might not be a big issue if we can take 
the `Row` object before passing it to the code gen'd functions. The command 
field could be set when the result Row is returned or in a wrapping 
`Collector`. 

My gut feeling is that the second approach is easier to implement because 
we (hopefully) do not need to touch the generated code and "just" need to wrap 
all `Row` objects in `CRow` objects.


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973655#comment-15973655
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112078358
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
 ---
@@ -68,12 +70,35 @@ class GroupAggProcessFunction(
 var accumulators = state.value()
 
 if (null == accumulators) {
+  previous = null
   accumulators = new Row(aggregates.length)
   i = 0
   while (i < aggregates.length) {
 accumulators.setField(i, aggregates(i).createAccumulator())
 i += 1
   }
+} else {
+  // get previous row
+  if (generateRetraction) {
+if (null == previous) {
+  previous = new Row(groupings.length + aggregates.length)
--- End diff --

can we initialize `previous` in `open()`?


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973665#comment-15973665
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112081457
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala
 ---
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream
+
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, 
StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit.Test
+import org.apache.flink.table.api.scala._
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.table.utils.TableFunc0
+
+import scala.collection.mutable
+
+/**
+  * tests for retraction
+  */
+class RetractionITCase extends StreamingWithStateTestBase {
+  // input data
+  val data = List(
+("Hello", 1),
+("word", 1),
+("Hello", 1),
+("bark", 1)
+  )
+
+  // keyed groupby + keyed groupby
+  @Test
+  def testWordCount(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.testResults = mutable.MutableList()
+env.setParallelism(1)
+env.setStateBackend(getStateBackend)
+
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'word, 'num)
+val resultTable = table
+  .groupBy('word)
+  .select('word as 'word, 'num.sum as 'count)
+  .groupBy('count)
+  .select('count, 'word.count as 'frequency)
+
+val results = resultTable.toDataStream[Row]
+results.addSink(new StreamITCase.StringSink)
+env.execute()
+
+val expected = Seq("1,1", "1,2", "1,1", "2,1", "1,2")
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  // keyed groupby + non-keyed groupby
+  @Test
+  def testGroupByAndNonKeyedGroupBy(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.testResults = mutable.MutableList()
+env.setParallelism(1)
+env.setStateBackend(getStateBackend)
+
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'word, 'num)
+val resultTable = table
+  .groupBy('word)
+  .select('word as 'word, 'num.sum as 'count)
+  .select('count.sum)
+
+val results = resultTable.toDataStream[Row]
+results.addSink(new StreamITCase.StringSink)
+env.execute()
+
+val expected = Seq("1", "2", "1", "3", "4")
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  // non-keyed groupby + keyed groupby
+  @Test
+  def testNonKeyedGroupByAndGroupBy(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.testResults = mutable.MutableList()
+env.setParallelism(1)
+env.setStateBackend(getStateBackend)
+
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'word, 'num)
+val resultTable = table
+  .select('num.sum as 'count)
+  .groupBy('count)
+  .select('count, 'count.count)
+
+val results = resultTable.toDataStream[Row]
+results.addSink(new StreamITCase.StringSink)
+env.execute()
+
+val 

[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973664#comment-15973664
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112078707
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
 ---
@@ -41,14 +41,16 @@ class GroupAggProcessFunction(
 private val aggregates: Array[AggregateFunction[_]],
 private val aggFields: Array[Array[Int]],
 private val groupings: Array[Int],
-private val aggregationStateType: RowTypeInfo)
+private val aggregationStateType: RowTypeInfo,
+private val generateRetraction: Boolean)
   extends ProcessFunction[Row, Row] {
 
   Preconditions.checkNotNull(aggregates)
   Preconditions.checkNotNull(aggFields)
   Preconditions.checkArgument(aggregates.length == aggFields.length)
 
   private var output: Row = _
--- End diff --

rename to `newRow`.


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973657#comment-15973657
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112079294
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
 ---
@@ -84,16 +109,38 @@ class GroupAggProcessFunction(
 }
 
 // Set aggregate result to the final output
-i = 0
-while (i < aggregates.length) {
-  val index = groupings.length + i
-  val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
-  aggregates(i).accumulate(accumulator, 
input.getField(aggFields(i)(0)))
-  output.setField(index, aggregates(i).getValue(accumulator))
-  i += 1
+if (input.command == Command.Delete) {
+  i = 0
+  while (i < aggregates.length) {
+val index = groupings.length + i
+val accumulator = 
accumulators.getField(i).asInstanceOf[Accumulator]
+aggregates(i).retract(accumulator, input.getField(aggFields(i)(0)))
+output.setField(index, aggregates(i).getValue(accumulator))
+i += 1
+  }
+} else {
+  i = 0
+  while (i < aggregates.length) {
+val index = groupings.length + i
+val accumulator = 
accumulators.getField(i).asInstanceOf[Accumulator]
+aggregates(i).accumulate(accumulator, 
input.getField(aggFields(i)(0)))
+output.setField(index, aggregates(i).getValue(accumulator))
+i += 1
+  }
 }
-state.update(accumulators)
 
+// if previous is not null, do retraction process
+if (null != previous) {
--- End diff --

check against `generateRetraction`. The check can be optimized because 
`generateRetraction`is a `val` and hence `final`.


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973663#comment-15973663
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112081928
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala
 ---
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream
+
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, 
StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit.Test
+import org.apache.flink.table.api.scala._
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.table.utils.TableFunc0
+
+import scala.collection.mutable
+
+/**
+  * tests for retraction
+  */
+class RetractionITCase extends StreamingWithStateTestBase {
+  // input data
+  val data = List(
+("Hello", 1),
+("word", 1),
+("Hello", 1),
+("bark", 1)
+  )
+
+  // keyed groupby + keyed groupby
+  @Test
+  def testWordCount(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.testResults = mutable.MutableList()
+env.setParallelism(1)
+env.setStateBackend(getStateBackend)
+
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'word, 'num)
+val resultTable = table
+  .groupBy('word)
+  .select('word as 'word, 'num.sum as 'count)
+  .groupBy('count)
+  .select('count, 'word.count as 'frequency)
+
+val results = resultTable.toDataStream[Row]
+results.addSink(new StreamITCase.StringSink)
+env.execute()
+
+val expected = Seq("1,1", "1,2", "1,1", "2,1", "1,2")
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  // keyed groupby + non-keyed groupby
+  @Test
+  def testGroupByAndNonKeyedGroupBy(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.testResults = mutable.MutableList()
+env.setParallelism(1)
+env.setStateBackend(getStateBackend)
+
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'word, 'num)
+val resultTable = table
+  .groupBy('word)
+  .select('word as 'word, 'num.sum as 'count)
+  .select('count.sum)
+
+val results = resultTable.toDataStream[Row]
+results.addSink(new StreamITCase.StringSink)
+env.execute()
+
+val expected = Seq("1", "2", "1", "3", "4")
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  // non-keyed groupby + keyed groupby
+  @Test
+  def testNonKeyedGroupByAndGroupBy(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.testResults = mutable.MutableList()
+env.setParallelism(1)
+env.setStateBackend(getStateBackend)
+
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'word, 'num)
+val resultTable = table
+  .select('num.sum as 'count)
+  .groupBy('count)
+  .select('count, 'count.count)
+
+val results = resultTable.toDataStream[Row]
+results.addSink(new StreamITCase.StringSink)
+env.execute()
+
+val 

[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973660#comment-15973660
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112064885
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
 ---
@@ -107,6 +107,14 @@ class DataStreamGroupWindowAggregate(
 val groupingKeys = grouping.indices.toArray
 val inputDS = 
input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
 
+val consumeRetraction = DataStreamRetractionRules.isAccRetract(input)
+
+if (consumeRetraction) {
+  throw new TableException(
+"Retraction on group window is not supported yet. Note: Currently, 
group window should " +
+  "not follow an unbounded groupby.")
--- End diff --

`unbounded groupBy` -> `non-windowed GroupBy`?


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973661#comment-15973661
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112078731
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
 ---
@@ -41,14 +41,16 @@ class GroupAggProcessFunction(
 private val aggregates: Array[AggregateFunction[_]],
 private val aggFields: Array[Array[Int]],
 private val groupings: Array[Int],
-private val aggregationStateType: RowTypeInfo)
+private val aggregationStateType: RowTypeInfo,
+private val generateRetraction: Boolean)
   extends ProcessFunction[Row, Row] {
 
   Preconditions.checkNotNull(aggregates)
   Preconditions.checkNotNull(aggFields)
   Preconditions.checkArgument(aggregates.length == aggFields.length)
 
   private var output: Row = _
+  private var previous: Row = _
--- End diff --

rename to `previousRow`.


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973654#comment-15973654
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r112080959
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala
 ---
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream
+
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, 
StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit.Test
+import org.apache.flink.table.api.scala._
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.table.utils.TableFunc0
+
+import scala.collection.mutable
+
+/**
+  * tests for retraction
+  */
+class RetractionITCase extends StreamingWithStateTestBase {
+  // input data
+  val data = List(
--- End diff --

Please use more test data.


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972723#comment-15972723
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r111959008
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
 ---
@@ -107,6 +107,14 @@ class DataStreamGroupWindowAggregate(
 val groupingKeys = grouping.indices.toArray
 val inputDS = 
input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
 
+val consumeRetraction = DataStreamRetractionRules.isAccRetract(input)
+
+if (consumeRetraction) {
+  throw new TableException(
+"Retraction on group window is not supported yet. Note: Currently, 
group window should " +
+  "not follow an unbounded groupby.")
--- End diff --

1. 'is not supported yet' -> 'unsupported yet' maybe more concisely.
2. groupby -> groupBy


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972714#comment-15972714
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r111957949
  
--- Diff: flink-core/src/main/java/org/apache/flink/types/Command.java ---
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.types;
+
+import java.io.Serializable;
+
+/**
+ * A Command is used in a {@link Row} to distinguish delete or add.
+ * Delete indicate a retracion row and Add means a nomal row.
--- End diff --

retracion -> retraction


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972719#comment-15972719
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r111958631
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
 ---
@@ -182,9 +192,17 @@ trait CommonCorrelate {
   returnType,
   rowType.getFieldNames.asScala)
 
+val retractionProcess =
--- End diff --

see CommonCalc.scala: L56


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972717#comment-15972717
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r111959829
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala
 ---
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream
+
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, 
StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit.Test
+import org.apache.flink.table.api.scala._
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.table.utils.TableFunc0
+
+import scala.collection.mutable
+
+/**
+  * tests for retraction
+  */
+class RetractionITCase extends StreamingWithStateTestBase {
+  // input data
+  val data = List(
+("Hello", 1),
+("word", 1),
+("Hello", 1),
+("bark", 1)
+  )
+
+  // keyed groupby + keyed groupby
+  @Test
+  def testWordCount(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.testResults = mutable.MutableList()
+env.setParallelism(1)
+env.setStateBackend(getStateBackend)
+
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'word, 'num)
+val resultTable = table
+  .groupBy('word)
+  .select('word as 'word, 'num.sum as 'count)
+  .groupBy('count)
+  .select('count, 'word.count as 'frequency)
+
+val results = resultTable.toDataStream[Row]
+results.addSink(new StreamITCase.StringSink)
+env.execute()
+
+val expected = Seq("1,1", "1,2", "1,1", "2,1", "1,2")
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  // keyed groupby + non-keyed groupby
+  @Test
+  def testGroupByAndNonKeyedGroupBy(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.testResults = mutable.MutableList()
+env.setParallelism(1)
+env.setStateBackend(getStateBackend)
+
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'word, 'num)
+val resultTable = table
+  .groupBy('word)
+  .select('word as 'word, 'num.sum as 'count)
+  .select('count.sum)
+
+val results = resultTable.toDataStream[Row]
+results.addSink(new StreamITCase.StringSink)
+env.execute()
+
+val expected = Seq("1", "2", "1", "3", "4")
--- End diff --

Could you explain this example?
table.groupBy('word).select('word as 'word, 'num.sum as 'count)
I think this query does not produce retraction


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 

[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972716#comment-15972716
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r111957812
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
 ---
@@ -171,6 +181,7 @@ public Row deserialize(DataInputView source) throws 
IOException {
result.setField(i, 
fieldSerializers[i].deserialize(source));
}
}
+   result.command = (Command) 
commandSerializer.deserialize(source);
--- End diff --

should we use the method with reuse variable ?
T deserialize(T reuse, DataInputView source)
though it doesn't perform reusing actually


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972711#comment-15972711
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r111958512
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
 ---
@@ -135,11 +136,20 @@ trait CommonCorrelate {
   }
   val outerResultExpr = functionGenerator.generateResultExpression(
 input1AccessExprs ++ input2NullExprs, returnType, 
rowType.getFieldNames.asScala)
+
+  val retractionProcess =
--- End diff --

see CommonCalc.scala: L56


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972715#comment-15972715
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r111958057
  
--- Diff: flink-core/src/main/java/org/apache/flink/types/Row.java ---
@@ -46,6 +46,9 @@
/** The array to store actual values. */
private final Object[] fields;
 
+   /** Indicate to add or delete this row */
--- End diff --

I prefer "Indicates that the line is add or delete type"


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972712#comment-15972712
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r111957675
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
 ---
@@ -40,6 +42,7 @@
private static final long serialVersionUID = 1L;
private final boolean[] nullMask;
private final TypeSerializer[] fieldSerializers;
+   private final EnumSerializer commandSerializer = new 
EnumSerializer(Command.class);
--- End diff --

advice: non-static final members should be initialized in constructor, 
otherwise make it 'static final' and use upper case variable name.


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972720#comment-15972720
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r111957849
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java
 ---
@@ -198,6 +209,7 @@ public Row deserialize(Row reuse, DataInputView source) 
throws IOException {
}
}
}
+   reuse.command = (Command) commandSerializer.deserialize(source);
--- End diff --

same with L184


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972721#comment-15972721
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r111958339
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
 ---
@@ -52,10 +53,18 @@ trait CommonCalc {
   rowType.getFieldNames,
   expandedExpressions)
 
+val retractionProcess =
--- End diff --

rename retractionProcess to retractionProcessCode
or moving this part to CodeGenerator considering the consistency of code 
style.
And I prefer the later one.


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972722#comment-15972722
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r111959594
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
 ---
@@ -84,16 +109,38 @@ class GroupAggProcessFunction(
 }
 
 // Set aggregate result to the final output
-i = 0
-while (i < aggregates.length) {
-  val index = groupings.length + i
-  val accumulator = accumulators.getField(i).asInstanceOf[Accumulator]
-  aggregates(i).accumulate(accumulator, 
input.getField(aggFields(i)(0)))
-  output.setField(index, aggregates(i).getValue(accumulator))
-  i += 1
+if (input.command == Command.Delete) {
+  i = 0
+  while (i < aggregates.length) {
+val index = groupings.length + i
+val accumulator = 
accumulators.getField(i).asInstanceOf[Accumulator]
+aggregates(i).retract(accumulator, input.getField(aggFields(i)(0)))
+output.setField(index, aggregates(i).getValue(accumulator))
+i += 1
+  }
+} else {
+  i = 0
+  while (i < aggregates.length) {
+val index = groupings.length + i
+val accumulator = 
accumulators.getField(i).asInstanceOf[Accumulator]
+aggregates(i).accumulate(accumulator, 
input.getField(aggFields(i)(0)))
+output.setField(index, aggregates(i).getValue(accumulator))
+i += 1
+  }
 }
-state.update(accumulators)
 
+// if previous is not null, do retraction process
+if (null != previous) {
+  if (previous.equals(output)) {
+// ignore same output
+return
--- End diff --

Is it possible which needs output the 'same' row ?



> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972724#comment-15972724
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r111959203
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -129,11 +131,17 @@ class DataStreamOverAggregate(
 generator,
 inputDS,
 isRowTimeType = false,
-isRowsClause = overWindow.isRows)
+isRowsClause = overWindow.isRows,
+consumeRetraction)
 } else if (
   overWindow.lowerBound.isPreceding && 
!overWindow.lowerBound.isUnbounded &&
 overWindow.upperBound.isCurrentRow) {
   // bounded OVER window
+  if (consumeRetraction) {
+throw new TableException(
+  "Retraction for bounded over window is not supported yet. 
Note: Currently, bounded " +
--- End diff --

1. Retraction for -> on
2. groupby -> groupBy
3. keep the term consistent either choosing 'OVER' or 'over' in the same 
context.


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972713#comment-15972713
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r111958225
  
--- Diff: flink-core/src/main/java/org/apache/flink/types/Row.java ---
@@ -46,6 +46,9 @@
/** The array to store actual values. */
private final Object[] fields;
 
+   /** Indicate to add or delete this row */
+   public Command command = Command.Add;
--- End diff --

I think change to private and offer getter/setter will be more better.


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972718#comment-15972718
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

Github user lincoln-lil commented on a diff in the pull request:

https://github.com/apache/flink/pull/3733#discussion_r111959451
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
 ---
@@ -68,12 +70,35 @@ class GroupAggProcessFunction(
 var accumulators = state.value()
 
 if (null == accumulators) {
+  previous = null
   accumulators = new Row(aggregates.length)
   i = 0
   while (i < aggregates.length) {
 accumulators.setField(i, aggregates(i).createAccumulator())
 i += 1
   }
+} else {
+  // get previous row
+  if (generateRetraction) {
+if (null == previous) {
+  previous = new Row(groupings.length + aggregates.length)
+  // previous is used to output retract message, so command of 
previous will always be
+  // Command.Delete
+  previous.command = Command.Delete
+}
+i = 0
+while (i < groupings.length) {
+  previous.setField(i, input.getField(groupings(i)))
+  i += 1
+}
+i = 0
+while (i < aggregates.length) {
+  val index = groupings.length + i
--- End diff --

reduce a local variable:
previous.setField(groupings.length + i, aggregates(i).getValue(accumulator))

also L118 and L127


> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates

2017-04-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972587#comment-15972587
 ] 

ASF GitHub Bot commented on FLINK-6091:
---

GitHub user hequn8128 opened a pull request:

https://github.com/apache/flink/pull/3733

[FLINK-6091] [table] Implement and turn on retraction for aggregates

Implement functions for generating and consuming retract messages for 
different aggregates.

1. add delete/add property to Row
2. implement functions for generating retract messages for unbounded groupBy
3. implement functions for handling retract messages for different 
aggregates.
4. handle retraction messages in `CommonCorrelate` and `CommonCalc` (retain 
Delete property).

Currently, only unbounded groupby generates retraction and it is working 
under unbounded and processing time mode. Hence, so far retraction is only 
supported for unbounded and processing time aggregations. We can add more 
retraction support later.

supported now: unbounded groupby, unbounded and processing time over window
unsupported now: group window, event time or bounded over window.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hequn8128/flink FLINK-6091-PR

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3733.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3733


commit 911e4516d3d5a17354c75d67e73153aa9194212b
Author: Hequn Cheng 
Date:   2017-04-18T08:54:09Z

[FLINK-6091] [table] Implement and turn on retraction for aggregates




> Implement and turn on the retraction for aggregates
> ---
>
> Key: FLINK-6091
> URL: https://issues.apache.org/jira/browse/FLINK-6091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shaoxuan Wang
>Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for 
> different aggregates. 
> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain 
> Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is 
> working under unbounded and processing time mode. Hence, retraction is only 
> supported for unbounded and processing time aggregations so far. We can add 
> more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)