[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984654#comment-15984654 ] ASF GitHub Bot commented on FLINK-6091: --- Github user hequn8128 closed the pull request at: https://github.com/apache/flink/pull/3733 > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984638#comment-15984638 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3733 Hi @hequn8128, I merged the PR to `table-retraction`. Can you close it? Thanks, Fabian > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984524#comment-15984524 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3733 Thanks for the update @hequn8128. The PR looks really good, IMO. Will do a couple of minor refactorings and merge it to `table-retraction` branch. Best, Fabian > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15984308#comment-15984308 ] ASF GitHub Bot commented on FLINK-6091: --- Github user hequn8128 commented on the issue: https://github.com/apache/flink/pull/3733 hi @fhueske , thanks a lot for your review and help. I have addressed all your comments and updated the PR. All changes have been checked before my latest update. Thanks, Hequn > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981878#comment-15981878 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r113049726 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala --- @@ -47,12 +48,12 @@ class ProcTimeBoundedRangeOver( genAggregations: GeneratedAggregationsFunction, precedingTimeBoundary: Long, aggregatesTypeInfo: RowTypeInfo, -inputType: TypeInformation[Row]) - extends ProcessFunction[Row, Row] +inputType: TypeInformation[CRow]) + extends ProcessFunction[CRow, CRow] with Compiler[GeneratedAggregations] { - private var output: Row = _ + private var output: CRow = _ private var accumulatorState: ValueState[Row] = _ - private var rowMapState: MapState[Long, JList[Row]] = _ --- End diff -- I think we should keep the state as `Row`. At the moment we do not need the `command` field. > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981891#comment-15981891 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r113050568 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala --- @@ -48,15 +49,15 @@ class ProcTimeBoundedRowsOver( genAggregations: GeneratedAggregationsFunction, precedingOffset: Long, aggregatesTypeInfo: RowTypeInfo, -inputType: TypeInformation[Row]) - extends ProcessFunction[Row, Row] +inputType: TypeInformation[CRow]) + extends ProcessFunction[CRow, CRow] with Compiler[GeneratedAggregations] { Preconditions.checkArgument(precedingOffset > 0) private var accumulatorState: ValueState[Row] = _ - private var rowMapState: MapState[Long, JList[Row]] = _ - private var output: Row = _ + private var rowMapState: MapState[Long, JList[CRow]] = _ --- End diff -- Same as for the other over window. Let's use `Row` for the state and extract the `Row` from the `CRow`. `output` should be a `CRow`. > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981873#comment-15981873 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112901608 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -723,11 +738,11 @@ abstract class TableEnvironment(val config: TableConfig) { val genFunction = generator.generateFunction( functionName, - classOf[MapFunction[Row, T]], + classOf[MapFunction[P, T]], --- End diff -- This means we need a dedicated `MapRunner` (and other runners) for streaming, but IMO, that's an easier change than touching the code-gen of the whole Table API. > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981887#comment-15981887 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r113048507 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -159,18 +174,22 @@ object AggregateUtil { * @param generator code generator instance * @param namedAggregates List of calls to aggregate functions and their output field names * @param inputType Input row type +* @param inputTypeInfo Input DataStream row type +* @param returnTypeInfo Return DataStream row type * @param precedingOffset the preceding offset -* @param isRowsClause It is a tag that indicates whether the OVER clause is ROWS clause +* @param isRowsClauseIt is a tag that indicates whether the OVER clause is ROWS clause * @param isRowTimeType It is a tag that indicates whether the time type is rowTimeType * @return [[org.apache.flink.streaming.api.functions.ProcessFunction]] */ - private[flink] def createBoundedOverProcessFunction( + private[flink] def createBoundedOverProcessFunction[T]( generator: CodeGenerator, namedAggregates: Seq[CalcitePair[AggregateCall, String]], inputType: RelDataType, +inputTypeInfo: TypeInformation[T], --- End diff -- remove new type info parameters. > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981880#comment-15981880 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r113050047 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala --- @@ -93,7 +94,7 @@ class ProcTimeBoundedRangeOver( var rowList = rowMapState.get(currentTime) // null value means that this si the first event received for this timestamp if (rowList == null) { - rowList = new ArrayList[Row]() + rowList = new ArrayList[CRow]() --- End diff -- Keep `Row` > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981885#comment-15981885 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r113046533 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala --- @@ -92,11 +92,13 @@ class DataStreamCalc( val genFunction = generator.generateFunction( --- End diff -- If we generate a `FlatMapFunction[Row, Row]` we can use a `RetractFlatMapRunner` class to unwrap the `Row` from the `CRow` before calling the generated code. > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981872#comment-15981872 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r113043191 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala --- @@ -233,6 +234,9 @@ object CodeGenUtils { case ri: RowTypeInfo => ProductAccessor(index) + case cri: CRowTypeInfo => --- End diff -- Revert these changes > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981893#comment-15981893 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r113051216 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala --- @@ -19,34 +19,44 @@ package org.apache.flink.table.runtime.aggregate import org.apache.calcite.runtime.SqlFunctions +import org.apache.flink.table.runtime.types.CRow import org.apache.flink.types.Row import org.apache.flink.util.Collector /** * Adds TimeWindow properties to specified fields of a row before it emits the row to a wrapped * collector. */ -class TimeWindowPropertyCollector(windowStartOffset: Option[Int], windowEndOffset: Option[Int]) -extends Collector[Row] { +class TimeWindowPropertyCollector[T](windowStartOffset: Option[Int], windowEndOffset: Option[Int]) --- End diff -- we can make this an `abstract` class and add to subclasses `RowTimeWindowPropertyCollector` and `CRowTimeWindowPropertyCollector`. That would avoid the `if` condition to identify the input type. > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981884#comment-15981884 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r113048689 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -179,7 +198,9 @@ object AggregateUtil { needRetraction = true) val aggregationStateType: RowTypeInfo = createAccumulatorRowType(aggregates) -val inputRowType = FlinkTypeFactory.toInternalRowTypeInfo(inputType).asInstanceOf[RowTypeInfo] +val inputRowType = FlinkTypeFactory --- End diff -- change to `CRowTypeInfo(FlinkTypeFactory.toInternalRowType(inputType)))` > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981877#comment-15981877 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r113046948 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala --- @@ -22,17 +22,16 @@ import org.apache.calcite.rel.RelNode import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.table.api.StreamTableEnvironment import org.apache.flink.table.plan.nodes.FlinkRel -import org.apache.flink.types.Row -trait DataStreamRel extends RelNode with FlinkRel { +trait DataStreamRel[T] extends RelNode with FlinkRel { --- End diff -- I don't think we need to parameterize `DataStreamRel`. All `DataStreamRel` nodes should return `CRow` in my opinion. > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981895#comment-15981895 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r113048214 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -102,13 +110,17 @@ object AggregateUtil { new RowTimeUnboundedRowsOver( genFunction, aggregationStateType, - FlinkTypeFactory.toInternalRowTypeInfo(inputType)) + FlinkTypeFactory +.toInternalRowTypeInfo(inputType, classOf[CRow]) +.asInstanceOf[CRowTypeInfo]) } else { // RANGE unbounded over process function new RowTimeUnboundedRangeOver( genFunction, aggregationStateType, - FlinkTypeFactory.toInternalRowTypeInfo(inputType)) + FlinkTypeFactory --- End diff -- change to `CRowTypeInfo(FlinkTypeFactory.toInternalRowType(inputType)))` > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981870#comment-15981870 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r113044668 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala --- @@ -23,26 +23,28 @@ import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rex._ import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction} import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.table.api.TableConfig import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.codegen.{CodeGenerator, GeneratedFunction} import org.apache.flink.table.runtime.FlatMapRunner +import org.apache.flink.table.runtime.types.CRowTypeInfo import org.apache.flink.types.Row import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ trait CommonCalc { - private[flink] def functionBody( + private[flink] def functionBody[T]( --- End diff -- I would follow a similar strategy as with the `TableEnvironment` sink conversion for the `CommonX` classes. The `CommonX` class has a method to generate the code for a function that operates on `Row`. Each class that extends `CommonX` has a dedicated method to instantiate the wrapper class (e..g, `MapRunner`, `FlatMapRunner`. ). The runner classes use the generated function to operate on Rows. For streaming operators, the runner classes unwrap the `Row` from `CRow` before calling the generated code. > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981889#comment-15981889 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r113050816 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala --- @@ -82,9 +83,9 @@ class ProcTimeBoundedRangeOver( } override def processElement( -input: Row, -ctx: ProcessFunction[Row, Row]#Context, -out: Collector[Row]): Unit = { +input: CRow, +ctx: ProcessFunction[CRow, CRow]#Context, +out: Collector[CRow]): Unit = { val currentTime = ctx.timerService.currentProcessingTime --- End diff -- we can set `val row = input.row` and add `row` instead of `input` to the state. > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981883#comment-15981883 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r113050330 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala --- @@ -69,10 +70,10 @@ class ProcTimeBoundedRangeOver( output = function.createOutputRow() --- End diff -- When the code-gen'd functions need as `Row` to set the result. We can simply pass `output.row` then and later emit `output`. > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981886#comment-15981886 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r113049237 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala --- @@ -31,35 +32,35 @@ abstract class GeneratedAggregations extends Function { * * @param accumulators the accumulators (saved in a row) which contains the current * aggregated results -* @param output output results collected in a row +* @param output output results collected in a command row */ - def setAggregationResults(accumulators: Row, output: Row) + def setAggregationResults(accumulators: Row, output: CRow) --- End diff -- I would change all `CRow` types back to `Row`. Let's leave the code generation as it is and extract the rows in the wrapper classes. > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981881#comment-15981881 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r113043582 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -255,17 +256,23 @@ class CodeGenerator( * * @return A GeneratedAggregationsFunction */ - def generateAggregations( + def generateAggregations[T]( --- End diff -- revert all changes in `CodeGenerator`. The wrapping functions (such as `MapRunner`) can extract the `Row` from a `CRow` and call the generated function with that object. > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981894#comment-15981894 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r113051486 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRow.scala --- @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.types + +import org.apache.flink.types.Row + +/** + * Wrapper for a [[Row]] to add retraction information. + * + * If [[change]] is true, the [[CRow]] is an accumulate message, if it is false it is a + * retraction message. + * + * @param row The wrapped [[Row]]. + * @param change true for an accumulate message, false for a retraction message. + */ +class CRow(var row: Row, var change: Boolean) { + + override def toString: String = s"${if(change) "+" else "-"}$row" + + override def equals(other: scala.Any): Boolean = { +val otherCRow = other.asInstanceOf[CRow] +row.equals(otherCRow.row) && change == otherCRow.change + } +} + +class XRow(arity: Int, var change: Boolean) extends Row(arity) { --- End diff -- Can this be removed? > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981874#comment-15981874 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r113045326 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamCalc.scala --- @@ -45,7 +45,7 @@ class DataStreamCalc( ruleDescription: String) extends Calc(cluster, traitSet, input, calcProgram) with CommonCalc - with DataStreamRel { + with DataStreamRel[CRow] { --- End diff -- `DataStreamRel` does not need a type parameter, IMO. All `DataStreamRel` nodes should produce `CRow`. > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981892#comment-15981892 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r113049930 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala --- @@ -69,10 +70,10 @@ class ProcTimeBoundedRangeOver( output = function.createOutputRow() --- End diff -- we can init `output` as `output = new CRow(function.createOutputRow(), true)` > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981879#comment-15981879 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r113048361 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -118,7 +130,7 @@ object AggregateUtil { } else { new ProcTimeUnboundedNonPartitionedOver( genFunction, - aggregationStateType) + new CRowTypeInfo(aggregationStateType)) --- End diff -- we can keep `aggregationStateType` if we don't touch the code generation > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981882#comment-15981882 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r113048128 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -102,13 +110,17 @@ object AggregateUtil { new RowTimeUnboundedRowsOver( genFunction, aggregationStateType, - FlinkTypeFactory.toInternalRowTypeInfo(inputType)) + FlinkTypeFactory --- End diff -- change to `CRowTypeInfo(FlinkTypeFactory.toInternalRowType(inputType)))` > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981890#comment-15981890 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r113049009 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala --- @@ -66,7 +66,7 @@ class DataSetSessionWindowAggReduceGroupFunction( private var aggregateBuffer: Row = _ private var output: Row = _ - private var collector: TimeWindowPropertyCollector = _ + private var collector: TimeWindowPropertyCollector[Row] = _ --- End diff -- We can make `TimeWindowPropertyCollector` abstract and implement two versions, one for `Row` and one for `CRow` > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981888#comment-15981888 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r113047960 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -61,23 +62,28 @@ object AggregateUtil { * @param generator code generator instance * @param namedAggregates List of calls to aggregate functions and their output field names * @param inputType Input row type +* @param inputTypeInfo Input DataStream row type +* @param returnTypeInfo Return DataStream row type * @param isRowTimeType It is a tag that indicates whether the time type is rowTimeType * @param isPartitioned It is a tag that indicate whether the input is partitioned * @param isRowsClause It is a tag that indicates whether the OVER clause is ROWS clause */ - private[flink] def createUnboundedOverProcessFunction( + private[flink] def createUnboundedOverProcessFunction[T]( generator: CodeGenerator, namedAggregates: Seq[CalcitePair[AggregateCall, String]], inputType: RelDataType, +inputTypeInfo: TypeInformation[T], --- End diff -- We don't need these types if we keep the code generation as it is. > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981869#comment-15981869 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112901968 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala --- @@ -175,14 +176,19 @@ object FlinkTypeFactory { /** * Converts a Calcite logical record into a Flink type information. */ - def toInternalRowTypeInfo(logicalRowType: RelDataType): TypeInformation[Row] = { + def toInternalRowTypeInfo[T](logicalRowType: RelDataType, resultClass: Class[T]) --- End diff -- I would keep this method as it is. Whenever we need a `CRow` we can easily create it as `CRowTypeInfo(rowType)`. > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981867#comment-15981867 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r113044998 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala --- @@ -38,7 +39,9 @@ trait BatchScan extends CommonScan with DataSetRel { val inputType = input.getType -val internalType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType) --- End diff -- These changes can be reverted if we leave the `FlinkTypeFactory.toInternalRowTypeInfo()` method as it is. > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981871#comment-15981871 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r113043078 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -627,24 +640,27 @@ abstract class TableEnvironment(val config: TableConfig) { * @param functionName name of the map function. Must not be unique but has to be a * valid Java class identifier. */ - protected def sinkConversion[T]( - physicalRowTypeInfo: TypeInformation[Row], + protected def sinkConversion[T, P]( --- End diff -- I would refactor this as follows: - make `sinkConversion()` an abstract method that is implemented by `BatchTableEnvironemnt` and `StreamTableEnvironment` - extract the code-generation part that converts a `Row` into the requested data type and keep this in a method `TableEnvironment.generateRowConverterFunction()` - the implementations of `sinkConversion()` can use the `generateRowConverterFunction` > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981875#comment-15981875 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112902344 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowTypeInfo.scala --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.types + +import java.util + +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.{CompositeType, TypeComparator, TypeSerializer} +import org.apache.flink.api.common.typeutils.CompositeType.{FlatFieldDescriptor, TypeComparatorBuilder} +import org.apache.flink.api.java.typeutils.RowTypeInfo + +class CRowTypeInfo(val rowType: RowTypeInfo) extends CompositeType[CRow](classOf[CRow]) { + + override def getFieldNames: Array[String] = rowType.getFieldNames + + override def getFieldIndex(fieldName: String): Int = rowType.getFieldIndex(fieldName) + + override def getTypeAt[X](fieldExpression: String): TypeInformation[X] = +rowType.getTypeAt(fieldExpression) + + override def getTypeAt[X](pos: Int): TypeInformation[X] = +rowType.getTypeAt(pos) + + override def getFlatFields( + fieldExpression: String, + offset: Int, + result: util.List[FlatFieldDescriptor]): Unit = +rowType.getFlatFields(fieldExpression, offset, result) + + override def isBasicType: Boolean = rowType.isBasicType + + override def isTupleType: Boolean = rowType.isTupleType + + override def getArity: Int = rowType.getArity + + override def getTotalFields: Int = rowType.getTotalFields + + override def createSerializer(config: ExecutionConfig): TypeSerializer[CRow] = +new CRowSerializer(rowType.createSerializer(config)) + + // not implemented because we override createComparator + override protected def createTypeComparatorBuilder(): TypeComparatorBuilder[CRow] = null + + override def createComparator( + logicalKeyFields: Array[Int], + orders: Array[Boolean], + logicalFieldOffset: Int, + config: ExecutionConfig): TypeComparator[CRow] = { + +val rowComparator = rowType.createComparator( + logicalKeyFields, + orders, + logicalFieldOffset, + config) + +new CRowComparator(rowComparator) + } + + override def equals(obj: scala.Any): Boolean = { +if (this.canEqual(obj)) { + rowType.equals(obj.asInstanceOf[CRowTypeInfo].rowType) +} else { + false +} + } + + override def canEqual(obj: scala.Any): Boolean = obj.isInstanceOf[CRowTypeInfo] +} + --- End diff -- Add ``` object CRowTypeInfo { def apply(rowType: TypeInformation[Row]): CRowTypeInfo = { rowType match { case r: RowTypeInfo => new CRowTypeInfo(r) } } } ``` for easy creation of `CRowTypeInfo`. > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981876#comment-15981876 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r113047487 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala --- @@ -46,12 +47,12 @@ class AggregateAggFunction( accumulatorRow } - override def add(value: Row, accumulatorRow: Row): Unit = { + override def add(value: CRow, accumulatorRow: Row): Unit = { --- End diff -- add here `val row = value.row` and use `row` to aggregate the values. > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981868#comment-15981868 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112901336 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala --- @@ -723,11 +738,11 @@ abstract class TableEnvironment(val config: TableConfig) { val genFunction = generator.generateFunction( functionName, - classOf[MapFunction[Row, T]], + classOf[MapFunction[P, T]], --- End diff -- I think we should not touch the code generator. Instead, we unwrap the `Row` from the `CRow` before handing it to the code-gen'd function. I'll add more comments on this below. > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15981087#comment-15981087 ] ASF GitHub Bot commented on FLINK-6091: --- Github user hequn8128 commented on the issue: https://github.com/apache/flink/pull/3733 Hi @fhueske , the latest commit is a bugfix. It mainly includes the following contents: 1. request type add `CRow` support in `TableEnvironment`,so we can sink with `CRow` 2. `getFieldInfo` add `CRow` support in `TableEnvironment`,so we can init a `Table` from a `DataStream` of `CRow` 3. fix type erasure problem in `TableEnvironment` and `DataStreamRetractionRules` Thanks, Hequn > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15980122#comment-15980122 ] ASF GitHub Bot commented on FLINK-6091: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112818000 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala --- @@ -84,16 +109,38 @@ class GroupAggProcessFunction( } // Set aggregate result to the final output -i = 0 -while (i < aggregates.length) { - val index = groupings.length + i - val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] - aggregates(i).accumulate(accumulator, input.getField(aggFields(i)(0))) - output.setField(index, aggregates(i).getValue(accumulator)) - i += 1 +if (input.command == Command.Delete) { + i = 0 + while (i < aggregates.length) { +val index = groupings.length + i +val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] +aggregates(i).retract(accumulator, input.getField(aggFields(i)(0))) +output.setField(index, aggregates(i).getValue(accumulator)) +i += 1 + } +} else { + i = 0 + while (i < aggregates.length) { +val index = groupings.length + i +val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] +aggregates(i).accumulate(accumulator, input.getField(aggFields(i)(0))) +output.setField(index, aggregates(i).getValue(accumulator)) +i += 1 + } } -state.update(accumulators) +// if previous is not null, do retraction process +if (null != previous) { + if (previous.equals(output)) { +// ignore same output +return --- End diff -- yes, you are right, we should update the state even if we do not emit new values. thx > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15980123#comment-15980123 ] ASF GitHub Bot commented on FLINK-6091: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112818005 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala --- @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream + +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.table.api.{TableEnvironment, TableException} +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamingWithStateTestBase} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit.Test +import org.apache.flink.table.api.scala._ +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.table.utils.TableFunc0 + +import scala.collection.mutable + +/** + * tests for retraction + */ +class RetractionITCase extends StreamingWithStateTestBase { + // input data + val data = List( +("Hello", 1), +("word", 1), +("Hello", 1), +("bark", 1) + ) + + // keyed groupby + keyed groupby + @Test + def testWordCount(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.testResults = mutable.MutableList() +env.setParallelism(1) +env.setStateBackend(getStateBackend) + +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'word, 'num) +val resultTable = table + .groupBy('word) + .select('word as 'word, 'num.sum as 'count) + .groupBy('count) + .select('count, 'word.count as 'frequency) + +val results = resultTable.toDataStream[Row] +results.addSink(new StreamITCase.StringSink) +env.execute() + +val expected = Seq("1,1", "1,2", "1,1", "2,1", "1,2") +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + // keyed groupby + non-keyed groupby + @Test + def testGroupByAndNonKeyedGroupBy(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.testResults = mutable.MutableList() +env.setParallelism(1) +env.setStateBackend(getStateBackend) + +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'word, 'num) +val resultTable = table + .groupBy('word) + .select('word as 'word, 'num.sum as 'count) + .select('count.sum) + +val results = resultTable.toDataStream[Row] +results.addSink(new StreamITCase.StringSink) +env.execute() + +val expected = Seq("1", "2", "1", "3", "4") --- End diff -- hi, there is a `.select('count.sum)` after `groupBy('word).select('word as 'word, 'num.sum as 'count)` > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15980126#comment-15980126 ] ASF GitHub Bot commented on FLINK-6091: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112817984 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala --- @@ -68,12 +70,35 @@ class GroupAggProcessFunction( var accumulators = state.value() if (null == accumulators) { + previous = null accumulators = new Row(aggregates.length) i = 0 while (i < aggregates.length) { accumulators.setField(i, aggregates(i).createAccumulator()) i += 1 } +} else { + // get previous row + if (generateRetraction) { +if (null == previous) { + previous = new Row(groupings.length + aggregates.length) --- End diff -- done > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15980128#comment-15980128 ] ASF GitHub Bot commented on FLINK-6091: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112817910 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala --- @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream + +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.table.api.{TableEnvironment, TableException} +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamingWithStateTestBase} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit.Test +import org.apache.flink.table.api.scala._ +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.table.utils.TableFunc0 + +import scala.collection.mutable + +/** + * tests for retraction + */ +class RetractionITCase extends StreamingWithStateTestBase { + // input data + val data = List( +("Hello", 1), +("word", 1), +("Hello", 1), +("bark", 1) + ) + + // keyed groupby + keyed groupby + @Test + def testWordCount(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.testResults = mutable.MutableList() +env.setParallelism(1) +env.setStateBackend(getStateBackend) + +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'word, 'num) +val resultTable = table + .groupBy('word) + .select('word as 'word, 'num.sum as 'count) + .groupBy('count) + .select('count, 'word.count as 'frequency) + +val results = resultTable.toDataStream[Row] +results.addSink(new StreamITCase.StringSink) +env.execute() + +val expected = Seq("1,1", "1,2", "1,1", "2,1", "1,2") +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + // keyed groupby + non-keyed groupby + @Test + def testGroupByAndNonKeyedGroupBy(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.testResults = mutable.MutableList() +env.setParallelism(1) +env.setStateBackend(getStateBackend) + +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'word, 'num) +val resultTable = table + .groupBy('word) + .select('word as 'word, 'num.sum as 'count) + .select('count.sum) + +val results = resultTable.toDataStream[Row] +results.addSink(new StreamITCase.StringSink) +env.execute() + +val expected = Seq("1", "2", "1", "3", "4") +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + // non-keyed groupby + keyed groupby + @Test + def testNonKeyedGroupByAndGroupBy(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.testResults = mutable.MutableList() +env.setParallelism(1) +env.setStateBackend(getStateBackend) + +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'word, 'num) +val resultTable = table + .select('num.sum as 'count) + .groupBy('count) + .select('count, 'count.count) + +val results = resultTable.toDataStream[Row] +results.addSink(new StreamITCase.StringSink) +env.execute() + +
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15980125#comment-15980125 ] ASF GitHub Bot commented on FLINK-6091: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112817968 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -129,11 +131,17 @@ class DataStreamOverAggregate( generator, inputDS, isRowTimeType = false, -isRowsClause = overWindow.isRows) +isRowsClause = overWindow.isRows, +consumeRetraction) --- End diff -- hi, you are right, maybe it's better for us not to support OVER at current. > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15980127#comment-15980127 ] ASF GitHub Bot commented on FLINK-6091: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112817995 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala --- @@ -84,16 +109,38 @@ class GroupAggProcessFunction( } // Set aggregate result to the final output -i = 0 -while (i < aggregates.length) { - val index = groupings.length + i - val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] - aggregates(i).accumulate(accumulator, input.getField(aggFields(i)(0))) - output.setField(index, aggregates(i).getValue(accumulator)) - i += 1 +if (input.command == Command.Delete) { + i = 0 + while (i < aggregates.length) { +val index = groupings.length + i +val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] +aggregates(i).retract(accumulator, input.getField(aggFields(i)(0))) +output.setField(index, aggregates(i).getValue(accumulator)) +i += 1 + } +} else { + i = 0 + while (i < aggregates.length) { +val index = groupings.length + i +val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] +aggregates(i).accumulate(accumulator, input.getField(aggFields(i)(0))) +output.setField(index, aggregates(i).getValue(accumulator)) +i += 1 + } } -state.update(accumulators) +// if previous is not null, do retraction process +if (null != previous) { + if (previous.equals(output)) { +// ignore same output +return --- End diff -- hi, we need not to output a same row, think about batch, only one result will be output, is it right? > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15980124#comment-15980124 ] ASF GitHub Bot commented on FLINK-6091: --- Github user hequn8128 commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112817965 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala --- @@ -107,6 +107,14 @@ class DataStreamGroupWindowAggregate( val groupingKeys = grouping.indices.toArray val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) +val consumeRetraction = DataStreamRetractionRules.isAccRetract(input) + +if (consumeRetraction) { + throw new TableException( +"Retraction on group window is not supported yet. Note: Currently, group window should " + + "not follow an unbounded groupby.") --- End diff -- yes, non-windowed is more clear > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973662#comment-15973662 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112080186 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala --- @@ -84,16 +109,38 @@ class GroupAggProcessFunction( } // Set aggregate result to the final output -i = 0 -while (i < aggregates.length) { - val index = groupings.length + i - val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] - aggregates(i).accumulate(accumulator, input.getField(aggFields(i)(0))) - output.setField(index, aggregates(i).getValue(accumulator)) - i += 1 +if (input.command == Command.Delete) { + i = 0 + while (i < aggregates.length) { +val index = groupings.length + i +val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] +aggregates(i).retract(accumulator, input.getField(aggFields(i)(0))) +output.setField(index, aggregates(i).getValue(accumulator)) +i += 1 + } +} else { + i = 0 + while (i < aggregates.length) { +val index = groupings.length + i +val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] +aggregates(i).accumulate(accumulator, input.getField(aggFields(i)(0))) +output.setField(index, aggregates(i).getValue(accumulator)) +i += 1 + } } -state.update(accumulators) +// if previous is not null, do retraction process +if (null != previous) { + if (previous.equals(output)) { +// ignore same output +return --- End diff -- We still need to update the state even if we do not emit new values. If we have a max aggregation with an accumulator that holds a map of `10->1, 5->2` and we add `8`. The new accumulator will be `10->1, 8->1, 5->2` but the aggregation result will still be 10. If we later retract `10`, the new max would be `5` but should be `8`. > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973658#comment-15973658 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112065233 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java --- @@ -152,6 +161,7 @@ public void serialize(Row record, DataOutputView target) throws IOException { fieldSerializers[i].serialize(o, target); } } + commandSerializer.serialize(record.command, target); --- End diff -- Also by adding the command to `Row` we add serialization overhead to all jobs that use Row (including batch Table API / SQL). > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973656#comment-15973656 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112065725 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -129,11 +131,17 @@ class DataStreamOverAggregate( generator, inputDS, isRowTimeType = false, -isRowsClause = overWindow.isRows) +isRowsClause = overWindow.isRows, +consumeRetraction) --- End diff -- I think it would be better to enable retraction for all types of OVER aggregates at the same time. Just supporting one specific type adds more confusion than it helps, in my opinion. > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973659#comment-15973659 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112060500 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java --- @@ -152,6 +161,7 @@ public void serialize(Row record, DataOutputView target) throws IOException { fieldSerializers[i].serialize(o, target); } } + commandSerializer.serialize(record.command, target); --- End diff -- I'm afraid we cannot change the serialization of `Row`. `Row` is a public class in `flink-core` and not an internal `flink-table` class. Hence, it is used at other places and might also be part of user applications. If we change the serialization, users might not be able to restore a job on 1.3 from a savepoint taken with 1.2. This restriction rules out to simply add a field to `Row` which would avoid major refactorings. I see two options to add the command field to the data streams in `flink-table` 1. use a regular field in `Row`. This would mean that the physical layout of the `Row` is no longer the same as the logical layout, i.e., the one expected by Calcite. However, we will probably change this anyway for the upcoming changes related to the time indicators. For these, the physical layout will have fewer fields than the logical layout (we will remove time fields which are in the meta data of Flink's records or taken as processing time). By adding the command field, we would add a field which is not in the logical layout. The problem with this approach is that the command field would be at different positions in the Row (probably the last one). We could leverage the changes introduced by the time indicator changes (or the other way round). @twalthr is working on this. You can have a look at the current status here: https://github.com/twalthr/flink/tree/FLINK-5884 2. The other option is to wrap the rows in a custom data type similar to a `Tuple2[Row, Command]`. The data type could be names `Change` or `CRow` and would have its own `TypeInformation`, `TypeSerializer`, and `TypeComparator` which forward most calls to the type info, serializer, and comparator of `Row`. The problem with this approach is that we need to change the return types of all functions. For some functions this might not be a big issue if we can take the `Row` object before passing it to the code gen'd functions. The command field could be set when the result Row is returned or in a wrapping `Collector`. My gut feeling is that the second approach is easier to implement because we (hopefully) do not need to touch the generated code and "just" need to wrap all `Row` objects in `CRow` objects. > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973655#comment-15973655 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112078358 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala --- @@ -68,12 +70,35 @@ class GroupAggProcessFunction( var accumulators = state.value() if (null == accumulators) { + previous = null accumulators = new Row(aggregates.length) i = 0 while (i < aggregates.length) { accumulators.setField(i, aggregates(i).createAccumulator()) i += 1 } +} else { + // get previous row + if (generateRetraction) { +if (null == previous) { + previous = new Row(groupings.length + aggregates.length) --- End diff -- can we initialize `previous` in `open()`? > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973665#comment-15973665 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112081457 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala --- @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream + +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.table.api.{TableEnvironment, TableException} +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamingWithStateTestBase} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit.Test +import org.apache.flink.table.api.scala._ +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.table.utils.TableFunc0 + +import scala.collection.mutable + +/** + * tests for retraction + */ +class RetractionITCase extends StreamingWithStateTestBase { + // input data + val data = List( +("Hello", 1), +("word", 1), +("Hello", 1), +("bark", 1) + ) + + // keyed groupby + keyed groupby + @Test + def testWordCount(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.testResults = mutable.MutableList() +env.setParallelism(1) +env.setStateBackend(getStateBackend) + +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'word, 'num) +val resultTable = table + .groupBy('word) + .select('word as 'word, 'num.sum as 'count) + .groupBy('count) + .select('count, 'word.count as 'frequency) + +val results = resultTable.toDataStream[Row] +results.addSink(new StreamITCase.StringSink) +env.execute() + +val expected = Seq("1,1", "1,2", "1,1", "2,1", "1,2") +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + // keyed groupby + non-keyed groupby + @Test + def testGroupByAndNonKeyedGroupBy(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.testResults = mutable.MutableList() +env.setParallelism(1) +env.setStateBackend(getStateBackend) + +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'word, 'num) +val resultTable = table + .groupBy('word) + .select('word as 'word, 'num.sum as 'count) + .select('count.sum) + +val results = resultTable.toDataStream[Row] +results.addSink(new StreamITCase.StringSink) +env.execute() + +val expected = Seq("1", "2", "1", "3", "4") +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + // non-keyed groupby + keyed groupby + @Test + def testNonKeyedGroupByAndGroupBy(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.testResults = mutable.MutableList() +env.setParallelism(1) +env.setStateBackend(getStateBackend) + +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'word, 'num) +val resultTable = table + .select('num.sum as 'count) + .groupBy('count) + .select('count, 'count.count) + +val results = resultTable.toDataStream[Row] +results.addSink(new StreamITCase.StringSink) +env.execute() + +val
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973664#comment-15973664 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112078707 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala --- @@ -41,14 +41,16 @@ class GroupAggProcessFunction( private val aggregates: Array[AggregateFunction[_]], private val aggFields: Array[Array[Int]], private val groupings: Array[Int], -private val aggregationStateType: RowTypeInfo) +private val aggregationStateType: RowTypeInfo, +private val generateRetraction: Boolean) extends ProcessFunction[Row, Row] { Preconditions.checkNotNull(aggregates) Preconditions.checkNotNull(aggFields) Preconditions.checkArgument(aggregates.length == aggFields.length) private var output: Row = _ --- End diff -- rename to `newRow`. > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973657#comment-15973657 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112079294 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala --- @@ -84,16 +109,38 @@ class GroupAggProcessFunction( } // Set aggregate result to the final output -i = 0 -while (i < aggregates.length) { - val index = groupings.length + i - val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] - aggregates(i).accumulate(accumulator, input.getField(aggFields(i)(0))) - output.setField(index, aggregates(i).getValue(accumulator)) - i += 1 +if (input.command == Command.Delete) { + i = 0 + while (i < aggregates.length) { +val index = groupings.length + i +val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] +aggregates(i).retract(accumulator, input.getField(aggFields(i)(0))) +output.setField(index, aggregates(i).getValue(accumulator)) +i += 1 + } +} else { + i = 0 + while (i < aggregates.length) { +val index = groupings.length + i +val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] +aggregates(i).accumulate(accumulator, input.getField(aggFields(i)(0))) +output.setField(index, aggregates(i).getValue(accumulator)) +i += 1 + } } -state.update(accumulators) +// if previous is not null, do retraction process +if (null != previous) { --- End diff -- check against `generateRetraction`. The check can be optimized because `generateRetraction`is a `val` and hence `final`. > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973663#comment-15973663 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112081928 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala --- @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream + +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.table.api.{TableEnvironment, TableException} +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamingWithStateTestBase} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit.Test +import org.apache.flink.table.api.scala._ +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.table.utils.TableFunc0 + +import scala.collection.mutable + +/** + * tests for retraction + */ +class RetractionITCase extends StreamingWithStateTestBase { + // input data + val data = List( +("Hello", 1), +("word", 1), +("Hello", 1), +("bark", 1) + ) + + // keyed groupby + keyed groupby + @Test + def testWordCount(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.testResults = mutable.MutableList() +env.setParallelism(1) +env.setStateBackend(getStateBackend) + +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'word, 'num) +val resultTable = table + .groupBy('word) + .select('word as 'word, 'num.sum as 'count) + .groupBy('count) + .select('count, 'word.count as 'frequency) + +val results = resultTable.toDataStream[Row] +results.addSink(new StreamITCase.StringSink) +env.execute() + +val expected = Seq("1,1", "1,2", "1,1", "2,1", "1,2") +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + // keyed groupby + non-keyed groupby + @Test + def testGroupByAndNonKeyedGroupBy(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.testResults = mutable.MutableList() +env.setParallelism(1) +env.setStateBackend(getStateBackend) + +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'word, 'num) +val resultTable = table + .groupBy('word) + .select('word as 'word, 'num.sum as 'count) + .select('count.sum) + +val results = resultTable.toDataStream[Row] +results.addSink(new StreamITCase.StringSink) +env.execute() + +val expected = Seq("1", "2", "1", "3", "4") +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + // non-keyed groupby + keyed groupby + @Test + def testNonKeyedGroupByAndGroupBy(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.testResults = mutable.MutableList() +env.setParallelism(1) +env.setStateBackend(getStateBackend) + +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'word, 'num) +val resultTable = table + .select('num.sum as 'count) + .groupBy('count) + .select('count, 'count.count) + +val results = resultTable.toDataStream[Row] +results.addSink(new StreamITCase.StringSink) +env.execute() + +val
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973660#comment-15973660 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112064885 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala --- @@ -107,6 +107,14 @@ class DataStreamGroupWindowAggregate( val groupingKeys = grouping.indices.toArray val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) +val consumeRetraction = DataStreamRetractionRules.isAccRetract(input) + +if (consumeRetraction) { + throw new TableException( +"Retraction on group window is not supported yet. Note: Currently, group window should " + + "not follow an unbounded groupby.") --- End diff -- `unbounded groupBy` -> `non-windowed GroupBy`? > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973661#comment-15973661 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112078731 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala --- @@ -41,14 +41,16 @@ class GroupAggProcessFunction( private val aggregates: Array[AggregateFunction[_]], private val aggFields: Array[Array[Int]], private val groupings: Array[Int], -private val aggregationStateType: RowTypeInfo) +private val aggregationStateType: RowTypeInfo, +private val generateRetraction: Boolean) extends ProcessFunction[Row, Row] { Preconditions.checkNotNull(aggregates) Preconditions.checkNotNull(aggFields) Preconditions.checkArgument(aggregates.length == aggFields.length) private var output: Row = _ + private var previous: Row = _ --- End diff -- rename to `previousRow`. > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973654#comment-15973654 ] ASF GitHub Bot commented on FLINK-6091: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r112080959 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala --- @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream + +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.table.api.{TableEnvironment, TableException} +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamingWithStateTestBase} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit.Test +import org.apache.flink.table.api.scala._ +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.table.utils.TableFunc0 + +import scala.collection.mutable + +/** + * tests for retraction + */ +class RetractionITCase extends StreamingWithStateTestBase { + // input data + val data = List( --- End diff -- Please use more test data. > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972723#comment-15972723 ] ASF GitHub Bot commented on FLINK-6091: --- Github user lincoln-lil commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r111959008 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala --- @@ -107,6 +107,14 @@ class DataStreamGroupWindowAggregate( val groupingKeys = grouping.indices.toArray val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) +val consumeRetraction = DataStreamRetractionRules.isAccRetract(input) + +if (consumeRetraction) { + throw new TableException( +"Retraction on group window is not supported yet. Note: Currently, group window should " + + "not follow an unbounded groupby.") --- End diff -- 1. 'is not supported yet' -> 'unsupported yet' maybe more concisely. 2. groupby -> groupBy > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972714#comment-15972714 ] ASF GitHub Bot commented on FLINK-6091: --- Github user lincoln-lil commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r111957949 --- Diff: flink-core/src/main/java/org/apache/flink/types/Command.java --- @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.types; + +import java.io.Serializable; + +/** + * A Command is used in a {@link Row} to distinguish delete or add. + * Delete indicate a retracion row and Add means a nomal row. --- End diff -- retracion -> retraction > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972719#comment-15972719 ] ASF GitHub Bot commented on FLINK-6091: --- Github user lincoln-lil commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r111958631 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala --- @@ -182,9 +192,17 @@ trait CommonCorrelate { returnType, rowType.getFieldNames.asScala) +val retractionProcess = --- End diff -- see CommonCalc.scala: L56 > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972717#comment-15972717 ] ASF GitHub Bot commented on FLINK-6091: --- Github user lincoln-lil commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r111959829 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala --- @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream + +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.table.api.{TableEnvironment, TableException} +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamingWithStateTestBase} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit.Test +import org.apache.flink.table.api.scala._ +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.table.utils.TableFunc0 + +import scala.collection.mutable + +/** + * tests for retraction + */ +class RetractionITCase extends StreamingWithStateTestBase { + // input data + val data = List( +("Hello", 1), +("word", 1), +("Hello", 1), +("bark", 1) + ) + + // keyed groupby + keyed groupby + @Test + def testWordCount(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.testResults = mutable.MutableList() +env.setParallelism(1) +env.setStateBackend(getStateBackend) + +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'word, 'num) +val resultTable = table + .groupBy('word) + .select('word as 'word, 'num.sum as 'count) + .groupBy('count) + .select('count, 'word.count as 'frequency) + +val results = resultTable.toDataStream[Row] +results.addSink(new StreamITCase.StringSink) +env.execute() + +val expected = Seq("1,1", "1,2", "1,1", "2,1", "1,2") +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + // keyed groupby + non-keyed groupby + @Test + def testGroupByAndNonKeyedGroupBy(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.testResults = mutable.MutableList() +env.setParallelism(1) +env.setStateBackend(getStateBackend) + +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'word, 'num) +val resultTable = table + .groupBy('word) + .select('word as 'word, 'num.sum as 'count) + .select('count.sum) + +val results = resultTable.toDataStream[Row] +results.addSink(new StreamITCase.StringSink) +env.execute() + +val expected = Seq("1", "2", "1", "3", "4") --- End diff -- Could you explain this example? table.groupBy('word).select('word as 'word, 'num.sum as 'count) I think this query does not produce retraction > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. >
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972716#comment-15972716 ] ASF GitHub Bot commented on FLINK-6091: --- Github user lincoln-lil commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r111957812 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java --- @@ -171,6 +181,7 @@ public Row deserialize(DataInputView source) throws IOException { result.setField(i, fieldSerializers[i].deserialize(source)); } } + result.command = (Command) commandSerializer.deserialize(source); --- End diff -- should we use the method with reuse variable ? T deserialize(T reuse, DataInputView source) though it doesn't perform reusing actually > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972711#comment-15972711 ] ASF GitHub Bot commented on FLINK-6091: --- Github user lincoln-lil commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r111958512 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala --- @@ -135,11 +136,20 @@ trait CommonCorrelate { } val outerResultExpr = functionGenerator.generateResultExpression( input1AccessExprs ++ input2NullExprs, returnType, rowType.getFieldNames.asScala) + + val retractionProcess = --- End diff -- see CommonCalc.scala: L56 > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972715#comment-15972715 ] ASF GitHub Bot commented on FLINK-6091: --- Github user lincoln-lil commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r111958057 --- Diff: flink-core/src/main/java/org/apache/flink/types/Row.java --- @@ -46,6 +46,9 @@ /** The array to store actual values. */ private final Object[] fields; + /** Indicate to add or delete this row */ --- End diff -- I prefer "Indicates that the line is add or delete type" > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972712#comment-15972712 ] ASF GitHub Bot commented on FLINK-6091: --- Github user lincoln-lil commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r111957675 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java --- @@ -40,6 +42,7 @@ private static final long serialVersionUID = 1L; private final boolean[] nullMask; private final TypeSerializer[] fieldSerializers; + private final EnumSerializer commandSerializer = new EnumSerializer(Command.class); --- End diff -- advice: non-static final members should be initialized in constructor, otherwise make it 'static final' and use upper case variable name. > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972720#comment-15972720 ] ASF GitHub Bot commented on FLINK-6091: --- Github user lincoln-lil commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r111957849 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowSerializer.java --- @@ -198,6 +209,7 @@ public Row deserialize(Row reuse, DataInputView source) throws IOException { } } } + reuse.command = (Command) commandSerializer.deserialize(source); --- End diff -- same with L184 > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972721#comment-15972721 ] ASF GitHub Bot commented on FLINK-6091: --- Github user lincoln-lil commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r111958339 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala --- @@ -52,10 +53,18 @@ trait CommonCalc { rowType.getFieldNames, expandedExpressions) +val retractionProcess = --- End diff -- rename retractionProcess to retractionProcessCode or moving this part to CodeGenerator considering the consistency of code style. And I prefer the later one. > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972722#comment-15972722 ] ASF GitHub Bot commented on FLINK-6091: --- Github user lincoln-lil commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r111959594 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala --- @@ -84,16 +109,38 @@ class GroupAggProcessFunction( } // Set aggregate result to the final output -i = 0 -while (i < aggregates.length) { - val index = groupings.length + i - val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] - aggregates(i).accumulate(accumulator, input.getField(aggFields(i)(0))) - output.setField(index, aggregates(i).getValue(accumulator)) - i += 1 +if (input.command == Command.Delete) { + i = 0 + while (i < aggregates.length) { +val index = groupings.length + i +val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] +aggregates(i).retract(accumulator, input.getField(aggFields(i)(0))) +output.setField(index, aggregates(i).getValue(accumulator)) +i += 1 + } +} else { + i = 0 + while (i < aggregates.length) { +val index = groupings.length + i +val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] +aggregates(i).accumulate(accumulator, input.getField(aggFields(i)(0))) +output.setField(index, aggregates(i).getValue(accumulator)) +i += 1 + } } -state.update(accumulators) +// if previous is not null, do retraction process +if (null != previous) { + if (previous.equals(output)) { +// ignore same output +return --- End diff -- Is it possible which needs output the 'same' row ? > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972724#comment-15972724 ] ASF GitHub Bot commented on FLINK-6091: --- Github user lincoln-lil commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r111959203 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -129,11 +131,17 @@ class DataStreamOverAggregate( generator, inputDS, isRowTimeType = false, -isRowsClause = overWindow.isRows) +isRowsClause = overWindow.isRows, +consumeRetraction) } else if ( overWindow.lowerBound.isPreceding && !overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { // bounded OVER window + if (consumeRetraction) { +throw new TableException( + "Retraction for bounded over window is not supported yet. Note: Currently, bounded " + --- End diff -- 1. Retraction for -> on 2. groupby -> groupBy 3. keep the term consistent either choosing 'OVER' or 'over' in the same context. > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972713#comment-15972713 ] ASF GitHub Bot commented on FLINK-6091: --- Github user lincoln-lil commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r111958225 --- Diff: flink-core/src/main/java/org/apache/flink/types/Row.java --- @@ -46,6 +46,9 @@ /** The array to store actual values. */ private final Object[] fields; + /** Indicate to add or delete this row */ + public Command command = Command.Add; --- End diff -- I think change to private and offer getter/setter will be more better. > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972718#comment-15972718 ] ASF GitHub Bot commented on FLINK-6091: --- Github user lincoln-lil commented on a diff in the pull request: https://github.com/apache/flink/pull/3733#discussion_r111959451 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala --- @@ -68,12 +70,35 @@ class GroupAggProcessFunction( var accumulators = state.value() if (null == accumulators) { + previous = null accumulators = new Row(aggregates.length) i = 0 while (i < aggregates.length) { accumulators.setField(i, aggregates(i).createAccumulator()) i += 1 } +} else { + // get previous row + if (generateRetraction) { +if (null == previous) { + previous = new Row(groupings.length + aggregates.length) + // previous is used to output retract message, so command of previous will always be + // Command.Delete + previous.command = Command.Delete +} +i = 0 +while (i < groupings.length) { + previous.setField(i, input.getField(groupings(i))) + i += 1 +} +i = 0 +while (i < aggregates.length) { + val index = groupings.length + i --- End diff -- reduce a local variable: previous.setField(groupings.length + i, aggregates(i).getValue(accumulator)) also L118 and L127 > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
[ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972587#comment-15972587 ] ASF GitHub Bot commented on FLINK-6091: --- GitHub user hequn8128 opened a pull request: https://github.com/apache/flink/pull/3733 [FLINK-6091] [table] Implement and turn on retraction for aggregates Implement functions for generating and consuming retract messages for different aggregates. 1. add delete/add property to Row 2. implement functions for generating retract messages for unbounded groupBy 3. implement functions for handling retract messages for different aggregates. 4. handle retraction messages in `CommonCorrelate` and `CommonCalc` (retain Delete property). Currently, only unbounded groupby generates retraction and it is working under unbounded and processing time mode. Hence, so far retraction is only supported for unbounded and processing time aggregations. We can add more retraction support later. supported now: unbounded groupby, unbounded and processing time over window unsupported now: group window, event time or bounded over window. You can merge this pull request into a Git repository by running: $ git pull https://github.com/hequn8128/flink FLINK-6091-PR Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3733.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3733 commit 911e4516d3d5a17354c75d67e73153aa9194212b Author: Hequn ChengDate: 2017-04-18T08:54:09Z [FLINK-6091] [table] Implement and turn on retraction for aggregates > Implement and turn on the retraction for aggregates > --- > > Key: FLINK-6091 > URL: https://issues.apache.org/jira/browse/FLINK-6091 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Hequn Cheng > > Implement functions for generating and consuming retract messages for > different aggregates. > 1. add delete/add property to Row > 2. implement functions for generating retract messages for unbounded groupBy > 3. implement functions for handling retract messages for different aggregates. > 4. handle retraction messages in CommonCorrelate and CommonCalc (retain > Delete property). > Note: Currently, only unbounded groupby generates retraction and it is > working under unbounded and processing time mode. Hence, retraction is only > supported for unbounded and processing time aggregations so far. We can add > more retraction support later. > supported now: unbounded groupby, unbounded and processing time over window > unsupported now: group window, event time or bounded over window. -- This message was sent by Atlassian JIRA (v6.3.15#6346)