[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r108352888 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -112,7 +113,12 @@ class DataStreamOverAggregate( "condition.") } case _: RowTimeType => -throw new TableException("OVER Window of the EventTime type is not currently supported.") +if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { + createUnboundedAndCurrentRowEventTimeOverWindow(inputDS) --- End diff -- Updated JIRA [FLINK-5658](https://issues.apache.org/jira/browse/FLINK-5658). @sunjincheng121, not sure if that was an issue with your permissions. It is not possible to modify a closed issue. You need to reopen it first. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r108352300 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -112,7 +113,12 @@ class DataStreamOverAggregate( "condition.") } case _: RowTimeType => -throw new TableException("OVER Window of the EventTime type is not currently supported.") +if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { + createUnboundedAndCurrentRowEventTimeOverWindow(inputDS) --- End diff -- Oh, I just recalled why I hadn't added an event-time OVER ROWS windows. They cannot deal with late data because all results would need to be shifted if a late record would be inserted. I would propose to keep this implementation anyway, and just throw an exception if a user enables handling of late data. What do you think @hongyuhong and @sunjincheng121? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r108314395 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -112,7 +113,12 @@ class DataStreamOverAggregate( "condition.") } case _: RowTimeType => -throw new TableException("OVER Window of the EventTime type is not currently supported.") +if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { + createUnboundedAndCurrentRowEventTimeOverWindow(inputDS) --- End diff -- Hi @fhueske I had created a new JIRA. [FLINK-6200](https://issues.apache.org/jira/browse/FLINK-6200) for the `RANGE` case. But I do not have permission to modify this JIRA's name. So, can you help me to do it? :) Hi, @hongyuhong I'm glad to hear that you want to continue do the `RANGE` case. Feel free you want to take [FLINK-6200](https://issues.apache.org/jira/browse/FLINK-6200). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r108030107 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -112,7 +113,12 @@ class DataStreamOverAggregate( "condition.") } case _: RowTimeType => -throw new TableException("OVER Window of the EventTime type is not currently supported.") +if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { + createUnboundedAndCurrentRowEventTimeOverWindow(inputDS) --- End diff -- Yes, renaming the JIRA and a new for the `RANGE` case would be great! Thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user hongyuhong commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r108026230 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -112,7 +113,12 @@ class DataStreamOverAggregate( "condition.") } case _: RowTimeType => -throw new TableException("OVER Window of the EventTime type is not currently supported.") +if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { + createUnboundedAndCurrentRowEventTimeOverWindow(inputDS) --- End diff -- Hi @sunjincheng121ï¼thanks for your reminding, and i am glad to supplement it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r108025485 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -112,7 +113,12 @@ class DataStreamOverAggregate( "condition.") } case _: RowTimeType => -throw new TableException("OVER Window of the EventTime type is not currently supported.") +if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { + createUnboundedAndCurrentRowEventTimeOverWindow(inputDS) --- End diff -- No problem. So,Can I change this JIRA.'s title, And open a new JIRA. to addresses the `RANGE` case ? @fhueske Thanks, SunJincheng --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r107987832 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -112,7 +113,12 @@ class DataStreamOverAggregate( "condition.") } case _: RowTimeType => -throw new TableException("OVER Window of the EventTime type is not currently supported.") +if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { + createUnboundedAndCurrentRowEventTimeOverWindow(inputDS) --- End diff -- I see, so this PR addresses the `ROW` case. I'll push out a hotfix. Thanks for the notification @sunjincheng121! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r107982106 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -112,7 +113,12 @@ class DataStreamOverAggregate( "condition.") } case _: RowTimeType => -throw new TableException("OVER Window of the EventTime type is not currently supported.") +if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { + createUnboundedAndCurrentRowEventTimeOverWindow(inputDS) --- End diff -- EVENT-TIME OVER Need to treatment "rows" and "range" clause separately, because - ROWS specifies the window in physical units (rows). - RANGE specifies the window as a logical offset. They have different semantics, for example: DATA: ``` (long, int, String) (1L, 1, "H") (2L, 2, "H") (2L, 3,"H") ``` ROWS sum(b) result: `1,3,6` RANGE sum(b) result: `1,6,6` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3386 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r107719825 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util.{ArrayList, LinkedList, List => JList} + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.java.typeutils.{ListTypeInfo} +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowMapState: MapState[Long, JList[Row]] = _ + private var sortList: LinkedList[Long] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +sortList = new LinkedList[Long]() + +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", intermediateType) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val rowListTypeInfo: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]] +val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("rowmapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo) +rowMapState = getRuntimeContext.getMapState(mapStateDescriptor) + } + + /** +* Process one element from the input stream, not emit the output +* +* @param input The input value. +* @param ctx The ctx to register timer or get current time +* @param out The collector for returning result values. +* +*/ + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer + ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1) + + val rowList = +if (rowMapState.contains(ctx.timestamp)) rowMapState.get(ctx.timestamp) +else new ArrayList[Row]() + rowList.add(input) + rowMapState.put(ctx.timestamp, rowList) +} + } + + /** +* Called when a timer set fires, sort current records a
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r107712950 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util.{ArrayList, LinkedList, List => JList} + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.java.typeutils.{ListTypeInfo} +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowMapState: MapState[Long, JList[Row]] = _ + private var sortList: LinkedList[Long] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +sortList = new LinkedList[Long]() + +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", intermediateType) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val rowListTypeInfo: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]] +val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("rowmapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo) +rowMapState = getRuntimeContext.getMapState(mapStateDescriptor) + } + + /** +* Process one element from the input stream, not emit the output +* +* @param input The input value. +* @param ctx The ctx to register timer or get current time +* @param out The collector for returning result values. +* +*/ + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer + ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1) + + val rowList = +if (rowMapState.contains(ctx.timestamp)) rowMapState.get(ctx.timestamp) +else new ArrayList[Row]() + rowList.add(input) + rowMapState.put(ctx.timestamp, rowList) +} + } + + /** +* Called when a timer set fires, sort current records a
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r107714536 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util.{ArrayList, LinkedList, List => JList} + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.java.typeutils.{ListTypeInfo} +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowMapState: MapState[Long, JList[Row]] = _ + private var sortList: LinkedList[Long] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +sortList = new LinkedList[Long]() + +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", intermediateType) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val rowListTypeInfo: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]] +val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("rowmapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo) +rowMapState = getRuntimeContext.getMapState(mapStateDescriptor) + } + + /** +* Process one element from the input stream, not emit the output +* +* @param input The input value. +* @param ctx The ctx to register timer or get current time +* @param out The collector for returning result values. +* +*/ + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer + ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1) + + val rowList = +if (rowMapState.contains(ctx.timestamp)) rowMapState.get(ctx.timestamp) +else new ArrayList[Row]() + rowList.add(input) + rowMapState.put(ctx.timestamp, rowList) +} + } + + /** +* Called when a timer set fires, sort current records a
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r107713264 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util.{ArrayList, LinkedList, List => JList} + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.java.typeutils.{ListTypeInfo} +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowMapState: MapState[Long, JList[Row]] = _ + private var sortList: LinkedList[Long] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +sortList = new LinkedList[Long]() + +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", intermediateType) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val rowListTypeInfo: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]] +val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("rowmapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo) +rowMapState = getRuntimeContext.getMapState(mapStateDescriptor) + } + + /** +* Process one element from the input stream, not emit the output +* +* @param input The input value. +* @param ctx The ctx to register timer or get current time +* @param out The collector for returning result values. +* +*/ + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer + ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1) + + val rowList = +if (rowMapState.contains(ctx.timestamp)) rowMapState.get(ctx.timestamp) +else new ArrayList[Row]() + rowList.add(input) + rowMapState.put(ctx.timestamp, rowList) +} + } + + /** +* Called when a timer set fires, sort current records a
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r107712202 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util.{ArrayList, LinkedList, List => JList} + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.java.typeutils.{ListTypeInfo} +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowMapState: MapState[Long, JList[Row]] = _ + private var sortList: LinkedList[Long] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +sortList = new LinkedList[Long]() + +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", intermediateType) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val rowListTypeInfo: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]] +val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("rowmapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo) +rowMapState = getRuntimeContext.getMapState(mapStateDescriptor) + } + + /** +* Process one element from the input stream, not emit the output +* +* @param input The input value. +* @param ctx The ctx to register timer or get current time +* @param out The collector for returning result values. +* +*/ + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer + ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1) + + val rowList = +if (rowMapState.contains(ctx.timestamp)) rowMapState.get(ctx.timestamp) --- End diff -- we can directly call `get()` and check for `null`. This will save one call to the state backend. --- If your project is set up for it, you can reply to this email and have your reply appear
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r107611456 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", intermediateType) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val tuple2Type: TypeInformation[Tuple2[Long, Row]] = + new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType) +.asInstanceOf[TypeInformation[Tuple2[Long, Row]]] +val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] = + new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", tuple2Type) +rowState = getRuntimeContext.getListState[Tuple2[Long, Row]](tupleStateDescriptor) + + } + + /** +* Process one element from the input stream, not emit the output +* +* @param input The input value. +* @param ctx The ctx to register timer or get current time +* @param out The collector for returning result values. +* +*/ + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer + ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1) + + rowState.add(new Tuple2(ctx.timestamp, input)) +} + } + + /** +* Called when a timer set fires, sort current records according the timestamp +* and emit the output +* +* @param timestamp The timestamp of the firing timer. +* @param ctx The ctx to register timer or get current ti
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r107609421 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala --- @@ -317,4 +320,193 @@ class SqlITCase extends StreamingWithStateTestBase { result.addSink(new StreamITCase.StringSink) env.execute() } + + /** test sliding event-time unbounded window with partition by **/ + @Test + def testUnboundedEventTimeRowWindowWithPartition(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +env.setStateBackend(getStateBackend) +StreamITCase.testResults = mutable.MutableList() +env.setParallelism(1) + +val sqlQuery = "SELECT a, b, c, " + + "SUM(b) over (" + + "partition by a order by rowtime() range between unbounded preceding and current row), " + + "count(b) over (" + + "partition by a order by rowtime() range between unbounded preceding and current row), " + + "avg(b) over (" + + "partition by a order by rowtime() range between unbounded preceding and current row), " + + "max(b) over (" + + "partition by a order by rowtime() range between unbounded preceding and current row), " + + "min(b) over (" + + "partition by a order by rowtime() range between unbounded preceding and current row) " + + "from T1" + +val t1 = env.addSource[(Int, Long, String)](new SourceFunction[(Int, Long, String)] { + override def run(ctx: SourceContext[(Int, Long, String)]): Unit = { +ctx.collectWithTimestamp((1, 1L, "Hi"), 1405L) +ctx.collectWithTimestamp((2, 1L, "Hello"), 1400L) +ctx.collectWithTimestamp((3, 1L, "Hello"), 1402L) +ctx.collectWithTimestamp((1, 2L, "Hello"), 1403L) +ctx.collectWithTimestamp((1, 3L, "Hello world"), 1404L) +ctx.collectWithTimestamp((3, 2L, "Hello world"), 1407L) +ctx.collectWithTimestamp((2, 2L, "Hello world"), 1408L) +ctx.emitWatermark(new Watermark(1410L)) +ctx.collectWithTimestamp((1, 4L, "Hello world"), 1408L) +ctx.collectWithTimestamp((2, 3L, "Hello world"), 1408L) +ctx.collectWithTimestamp((3, 3L, "Hello world"), 1408L) +ctx.collectWithTimestamp((1, 5L, "Hello world"), 1412L) +ctx.emitWatermark(new Watermark(1420L)) +ctx.collectWithTimestamp((1, 6L, "Hello world"), 1421L) +ctx.collectWithTimestamp((1, 6L, "Hello world"), 1419L) +ctx.collectWithTimestamp((2, 4L, "Hello world"), 1418L) +ctx.collectWithTimestamp((3, 4L, "Hello world"), 1418L) +ctx.collectWithTimestamp((2, 5L, "Hello world"), 1422L) +ctx.collectWithTimestamp((3, 5L, "Hello world"), 1422L) +ctx.collectWithTimestamp((1, 7L, "Hello world"), 1424L) +ctx.collectWithTimestamp((1, 8L, "Hello world"), 1423L) +ctx.collectWithTimestamp((1, 9L, "Hello world"), 1421L) +ctx.emitWatermark(new Watermark(1430L)) + } + + override def cancel(): Unit = {} +}).toTable(tEnv).as('a, 'b, 'c) + +tEnv.registerTable("T1", t1) + +val result = tEnv.sql(sqlQuery).toDataStream[Row] +result.addSink(new StreamITCase.StringSink) +env.execute() + +val expected = mutable.MutableList( + "1,2,Hello,2,1,2,2,2", + "1,3,Hello world,5,2,2,3,2", + "1,1,Hi,6,3,2,3,1", + "2,1,Hello,1,1,1,1,1", + "2,2,Hello world,3,2,1,2,1", + "3,1,Hello,1,1,1,1,1", + "3,2,Hello world,3,2,1,2,1", + "1,5,Hello world,11,4,2,5,1", + "1,6,Hello world,17,5,3,6,1", + "1,9,Hello world,26,6,4,9,1", + "1,8,Hello world,34,7,4,9,1", + "1,7,Hello world,41,8,5,9,1", + "2,5,Hello world,8,3,2,5,1", + "3,5,Hello world,8,3,2,5,1" +) +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + /** test sliding event-time unbounded window without partitiion by **/ + @Test + def testUnboundedEventTimeRowWindowWithoutPartition(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +env.setStateBackend(getStateBackend) +StreamITCase.testResults = mutable.MutableList() +env.setParallelism(1) --- End diff -- I think you are right. If we have a parallelism of > 1, th
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user hongyuhong commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r107597005 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", intermediateType) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val tuple2Type: TypeInformation[Tuple2[Long, Row]] = + new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType) +.asInstanceOf[TypeInformation[Tuple2[Long, Row]]] +val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] = + new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", tuple2Type) +rowState = getRuntimeContext.getListState[Tuple2[Long, Row]](tupleStateDescriptor) + + } + + /** +* Process one element from the input stream, not emit the output +* +* @param input The input value. +* @param ctx The ctx to register timer or get current time +* @param out The collector for returning result values. +* +*/ + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer + ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1) + + rowState.add(new Tuple2(ctx.timestamp, input)) +} + } + + /** +* Called when a timer set fires, sort current records according the timestamp +* and emit the output +* +* @param timestamp The timestamp of the firing timer. +* @param ctx The ctx to register timer or get current
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r107585318 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", intermediateType) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val tuple2Type: TypeInformation[Tuple2[Long, Row]] = + new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType) +.asInstanceOf[TypeInformation[Tuple2[Long, Row]]] +val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] = + new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", tuple2Type) +rowState = getRuntimeContext.getListState[Tuple2[Long, Row]](tupleStateDescriptor) + + } + + /** +* Process one element from the input stream, not emit the output +* +* @param input The input value. +* @param ctx The ctx to register timer or get current time +* @param out The collector for returning result values. +* +*/ + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer + ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1) + + rowState.add(new Tuple2(ctx.timestamp, input)) +} + } + + /** +* Called when a timer set fires, sort current records according the timestamp +* and emit the output +* +* @param timestamp The timestamp of the firing timer. +* @param ctx The ctx to register timer or get cur
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user hongyuhong commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r107584162 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 --- End diff -- This reply is response to the mapState, en,, i don't know why i can't reply at that comment. If use MapState, we should deserialize and serialize the list everytime, i don't know if it cost less than the list add operator(RocksDB merge). And will the map.iterator loop as the order we put it in the map? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user hongyuhong commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r107583714 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala --- @@ -317,4 +320,193 @@ class SqlITCase extends StreamingWithStateTestBase { result.addSink(new StreamITCase.StringSink) env.execute() } + + /** test sliding event-time unbounded window with partition by **/ + @Test + def testUnboundedEventTimeRowWindowWithPartition(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +env.setStateBackend(getStateBackend) +StreamITCase.testResults = mutable.MutableList() +env.setParallelism(1) + +val sqlQuery = "SELECT a, b, c, " + + "SUM(b) over (" + + "partition by a order by rowtime() range between unbounded preceding and current row), " + + "count(b) over (" + + "partition by a order by rowtime() range between unbounded preceding and current row), " + + "avg(b) over (" + + "partition by a order by rowtime() range between unbounded preceding and current row), " + + "max(b) over (" + + "partition by a order by rowtime() range between unbounded preceding and current row), " + + "min(b) over (" + + "partition by a order by rowtime() range between unbounded preceding and current row) " + + "from T1" + +val t1 = env.addSource[(Int, Long, String)](new SourceFunction[(Int, Long, String)] { + override def run(ctx: SourceContext[(Int, Long, String)]): Unit = { +ctx.collectWithTimestamp((1, 1L, "Hi"), 1405L) +ctx.collectWithTimestamp((2, 1L, "Hello"), 1400L) +ctx.collectWithTimestamp((3, 1L, "Hello"), 1402L) +ctx.collectWithTimestamp((1, 2L, "Hello"), 1403L) +ctx.collectWithTimestamp((1, 3L, "Hello world"), 1404L) +ctx.collectWithTimestamp((3, 2L, "Hello world"), 1407L) +ctx.collectWithTimestamp((2, 2L, "Hello world"), 1408L) +ctx.emitWatermark(new Watermark(1410L)) +ctx.collectWithTimestamp((1, 4L, "Hello world"), 1408L) +ctx.collectWithTimestamp((2, 3L, "Hello world"), 1408L) +ctx.collectWithTimestamp((3, 3L, "Hello world"), 1408L) +ctx.collectWithTimestamp((1, 5L, "Hello world"), 1412L) +ctx.emitWatermark(new Watermark(1420L)) +ctx.collectWithTimestamp((1, 6L, "Hello world"), 1421L) +ctx.collectWithTimestamp((1, 6L, "Hello world"), 1419L) +ctx.collectWithTimestamp((2, 4L, "Hello world"), 1418L) +ctx.collectWithTimestamp((3, 4L, "Hello world"), 1418L) +ctx.collectWithTimestamp((2, 5L, "Hello world"), 1422L) +ctx.collectWithTimestamp((3, 5L, "Hello world"), 1422L) +ctx.collectWithTimestamp((1, 7L, "Hello world"), 1424L) +ctx.collectWithTimestamp((1, 8L, "Hello world"), 1423L) +ctx.collectWithTimestamp((1, 9L, "Hello world"), 1421L) +ctx.emitWatermark(new Watermark(1430L)) + } + + override def cancel(): Unit = {} +}).toTable(tEnv).as('a, 'b, 'c) + +tEnv.registerTable("T1", t1) + +val result = tEnv.sql(sqlQuery).toDataStream[Row] +result.addSink(new StreamITCase.StringSink) +env.execute() + +val expected = mutable.MutableList( + "1,2,Hello,2,1,2,2,2", + "1,3,Hello world,5,2,2,3,2", + "1,1,Hi,6,3,2,3,1", + "2,1,Hello,1,1,1,1,1", + "2,2,Hello world,3,2,1,2,1", + "3,1,Hello,1,1,1,1,1", + "3,2,Hello world,3,2,1,2,1", + "1,5,Hello world,11,4,2,5,1", + "1,6,Hello world,17,5,3,6,1", + "1,9,Hello world,26,6,4,9,1", + "1,8,Hello world,34,7,4,9,1", + "1,7,Hello world,41,8,5,9,1", + "2,5,Hello world,8,3,2,5,1", + "3,5,Hello world,8,3,2,5,1" +) +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + /** test sliding event-time unbounded window without partitiion by **/ + @Test + def testUnboundedEventTimeRowWindowWithoutPartition(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +env.setStateBackend(getStateBackend) +StreamITCase.testResults = mutable.MutableList() +env.setParallelism(1) --- End diff -- Hi @fhueske, i think if we just set the source of paral
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r107385159 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala --- @@ -317,4 +320,193 @@ class SqlITCase extends StreamingWithStateTestBase { result.addSink(new StreamITCase.StringSink) env.execute() } + + /** test sliding event-time unbounded window with partition by **/ + @Test + def testUnboundedEventTimeRowWindowWithPartition(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +env.setStateBackend(getStateBackend) +StreamITCase.testResults = mutable.MutableList() +env.setParallelism(1) + +val sqlQuery = "SELECT a, b, c, " + + "SUM(b) over (" + + "partition by a order by rowtime() range between unbounded preceding and current row), " + + "count(b) over (" + + "partition by a order by rowtime() range between unbounded preceding and current row), " + + "avg(b) over (" + + "partition by a order by rowtime() range between unbounded preceding and current row), " + + "max(b) over (" + + "partition by a order by rowtime() range between unbounded preceding and current row), " + + "min(b) over (" + + "partition by a order by rowtime() range between unbounded preceding and current row) " + + "from T1" + +val t1 = env.addSource[(Int, Long, String)](new SourceFunction[(Int, Long, String)] { + override def run(ctx: SourceContext[(Int, Long, String)]): Unit = { +ctx.collectWithTimestamp((1, 1L, "Hi"), 1405L) +ctx.collectWithTimestamp((2, 1L, "Hello"), 1400L) +ctx.collectWithTimestamp((3, 1L, "Hello"), 1402L) +ctx.collectWithTimestamp((1, 2L, "Hello"), 1403L) +ctx.collectWithTimestamp((1, 3L, "Hello world"), 1404L) +ctx.collectWithTimestamp((3, 2L, "Hello world"), 1407L) +ctx.collectWithTimestamp((2, 2L, "Hello world"), 1408L) +ctx.emitWatermark(new Watermark(1410L)) +ctx.collectWithTimestamp((1, 4L, "Hello world"), 1408L) +ctx.collectWithTimestamp((2, 3L, "Hello world"), 1408L) +ctx.collectWithTimestamp((3, 3L, "Hello world"), 1408L) +ctx.collectWithTimestamp((1, 5L, "Hello world"), 1412L) +ctx.emitWatermark(new Watermark(1420L)) +ctx.collectWithTimestamp((1, 6L, "Hello world"), 1421L) +ctx.collectWithTimestamp((1, 6L, "Hello world"), 1419L) +ctx.collectWithTimestamp((2, 4L, "Hello world"), 1418L) +ctx.collectWithTimestamp((3, 4L, "Hello world"), 1418L) +ctx.collectWithTimestamp((2, 5L, "Hello world"), 1422L) +ctx.collectWithTimestamp((3, 5L, "Hello world"), 1422L) +ctx.collectWithTimestamp((1, 7L, "Hello world"), 1424L) +ctx.collectWithTimestamp((1, 8L, "Hello world"), 1423L) +ctx.collectWithTimestamp((1, 9L, "Hello world"), 1421L) +ctx.emitWatermark(new Watermark(1430L)) + } + + override def cancel(): Unit = {} +}).toTable(tEnv).as('a, 'b, 'c) + +tEnv.registerTable("T1", t1) + +val result = tEnv.sql(sqlQuery).toDataStream[Row] +result.addSink(new StreamITCase.StringSink) +env.execute() + +val expected = mutable.MutableList( + "1,2,Hello,2,1,2,2,2", + "1,3,Hello world,5,2,2,3,2", + "1,1,Hi,6,3,2,3,1", + "2,1,Hello,1,1,1,1,1", + "2,2,Hello world,3,2,1,2,1", + "3,1,Hello,1,1,1,1,1", + "3,2,Hello world,3,2,1,2,1", + "1,5,Hello world,11,4,2,5,1", + "1,6,Hello world,17,5,3,6,1", + "1,9,Hello world,26,6,4,9,1", + "1,8,Hello world,34,7,4,9,1", + "1,7,Hello world,41,8,5,9,1", + "2,5,Hello world,8,3,2,5,1", + "3,5,Hello world,8,3,2,5,1" +) +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + /** test sliding event-time unbounded window without partitiion by **/ + @Test + def testUnboundedEventTimeRowWindowWithoutPartition(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +env.setStateBackend(getStateBackend) +StreamITCase.testResults = mutable.MutableList() +env.setParallelism(1) --- End diff -- The test should also work correctly if we only set the par
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r107392253 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala --- @@ -317,4 +320,193 @@ class SqlITCase extends StreamingWithStateTestBase { result.addSink(new StreamITCase.StringSink) env.execute() } + + /** test sliding event-time unbounded window with partition by **/ + @Test + def testUnboundedEventTimeRowWindowWithPartition(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +env.setStateBackend(getStateBackend) +StreamITCase.testResults = mutable.MutableList() +env.setParallelism(1) + +val sqlQuery = "SELECT a, b, c, " + + "SUM(b) over (" + + "partition by a order by rowtime() range between unbounded preceding and current row), " + + "count(b) over (" + + "partition by a order by rowtime() range between unbounded preceding and current row), " + + "avg(b) over (" + + "partition by a order by rowtime() range between unbounded preceding and current row), " + + "max(b) over (" + + "partition by a order by rowtime() range between unbounded preceding and current row), " + + "min(b) over (" + + "partition by a order by rowtime() range between unbounded preceding and current row) " + + "from T1" + +val t1 = env.addSource[(Int, Long, String)](new SourceFunction[(Int, Long, String)] { + override def run(ctx: SourceContext[(Int, Long, String)]): Unit = { +ctx.collectWithTimestamp((1, 1L, "Hi"), 1405L) +ctx.collectWithTimestamp((2, 1L, "Hello"), 1400L) +ctx.collectWithTimestamp((3, 1L, "Hello"), 1402L) +ctx.collectWithTimestamp((1, 2L, "Hello"), 1403L) +ctx.collectWithTimestamp((1, 3L, "Hello world"), 1404L) +ctx.collectWithTimestamp((3, 2L, "Hello world"), 1407L) +ctx.collectWithTimestamp((2, 2L, "Hello world"), 1408L) +ctx.emitWatermark(new Watermark(1410L)) +ctx.collectWithTimestamp((1, 4L, "Hello world"), 1408L) --- End diff -- indicate which records are late and will be dropped. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r107388109 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", intermediateType) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val tuple2Type: TypeInformation[Tuple2[Long, Row]] = + new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType) +.asInstanceOf[TypeInformation[Tuple2[Long, Row]]] +val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] = + new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", tuple2Type) +rowState = getRuntimeContext.getListState[Tuple2[Long, Row]](tupleStateDescriptor) + + } + + /** +* Process one element from the input stream, not emit the output +* +* @param input The input value. +* @param ctx The ctx to register timer or get current time +* @param out The collector for returning result values. +* +*/ + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer + ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1) + + rowState.add(new Tuple2(ctx.timestamp, input)) +} + } + + /** +* Called when a timer set fires, sort current records according the timestamp +* and emit the output +* +* @param timestamp The timestamp of the firing timer. +* @param ctx The ctx to register timer or get current ti
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r107390469 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", intermediateType) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val tuple2Type: TypeInformation[Tuple2[Long, Row]] = + new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType) +.asInstanceOf[TypeInformation[Tuple2[Long, Row]]] +val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] = + new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", tuple2Type) +rowState = getRuntimeContext.getListState[Tuple2[Long, Row]](tupleStateDescriptor) + + } + + /** +* Process one element from the input stream, not emit the output +* +* @param input The input value. +* @param ctx The ctx to register timer or get current time +* @param out The collector for returning result values. +* +*/ + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer + ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1) + + rowState.add(new Tuple2(ctx.timestamp, input)) +} + } + + /** +* Called when a timer set fires, sort current records according the timestamp +* and emit the output +* +* @param timestamp The timestamp of the firing timer. +* @param ctx The ctx to register timer or get current ti
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r107389572 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", intermediateType) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val tuple2Type: TypeInformation[Tuple2[Long, Row]] = + new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType) +.asInstanceOf[TypeInformation[Tuple2[Long, Row]]] +val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] = + new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", tuple2Type) +rowState = getRuntimeContext.getListState[Tuple2[Long, Row]](tupleStateDescriptor) + + } + + /** +* Process one element from the input stream, not emit the output +* +* @param input The input value. +* @param ctx The ctx to register timer or get current time +* @param out The collector for returning result values. +* +*/ + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer + ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1) + + rowState.add(new Tuple2(ctx.timestamp, input)) +} + } + + /** +* Called when a timer set fires, sort current records according the timestamp +* and emit the output +* +* @param timestamp The timestamp of the firing timer. +* @param ctx The ctx to register timer or get current ti
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r107383711 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -91,6 +91,33 @@ object AggregateUtil { } /** +* Create an [[ProcessFunction]] to evaluate final aggregate value. +* +* @param namedAggregates List of calls to aggregate functions and their output field names +* @param inputType Input row type +* @return [[UnboundedEventTimeOverProcessFunction]] +*/ + private[flink] def CreateUnboundedEventTimeOverProcessFunction( --- End diff -- methods should start with lowercase -> `createUnboundedEventTimeOverProcessFunction` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r107385173 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala --- @@ -317,4 +320,193 @@ class SqlITCase extends StreamingWithStateTestBase { result.addSink(new StreamITCase.StringSink) env.execute() } + + /** test sliding event-time unbounded window with partition by **/ + @Test + def testUnboundedEventTimeRowWindowWithPartition(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +env.setStateBackend(getStateBackend) +StreamITCase.testResults = mutable.MutableList() +env.setParallelism(1) + +val sqlQuery = "SELECT a, b, c, " + + "SUM(b) over (" + + "partition by a order by rowtime() range between unbounded preceding and current row), " + + "count(b) over (" + + "partition by a order by rowtime() range between unbounded preceding and current row), " + + "avg(b) over (" + + "partition by a order by rowtime() range between unbounded preceding and current row), " + + "max(b) over (" + + "partition by a order by rowtime() range between unbounded preceding and current row), " + + "min(b) over (" + + "partition by a order by rowtime() range between unbounded preceding and current row) " + + "from T1" + +val t1 = env.addSource[(Int, Long, String)](new SourceFunction[(Int, Long, String)] { + override def run(ctx: SourceContext[(Int, Long, String)]): Unit = { +ctx.collectWithTimestamp((1, 1L, "Hi"), 1405L) +ctx.collectWithTimestamp((2, 1L, "Hello"), 1400L) +ctx.collectWithTimestamp((3, 1L, "Hello"), 1402L) +ctx.collectWithTimestamp((1, 2L, "Hello"), 1403L) +ctx.collectWithTimestamp((1, 3L, "Hello world"), 1404L) +ctx.collectWithTimestamp((3, 2L, "Hello world"), 1407L) +ctx.collectWithTimestamp((2, 2L, "Hello world"), 1408L) +ctx.emitWatermark(new Watermark(1410L)) +ctx.collectWithTimestamp((1, 4L, "Hello world"), 1408L) +ctx.collectWithTimestamp((2, 3L, "Hello world"), 1408L) +ctx.collectWithTimestamp((3, 3L, "Hello world"), 1408L) +ctx.collectWithTimestamp((1, 5L, "Hello world"), 1412L) +ctx.emitWatermark(new Watermark(1420L)) +ctx.collectWithTimestamp((1, 6L, "Hello world"), 1421L) +ctx.collectWithTimestamp((1, 6L, "Hello world"), 1419L) +ctx.collectWithTimestamp((2, 4L, "Hello world"), 1418L) +ctx.collectWithTimestamp((3, 4L, "Hello world"), 1418L) +ctx.collectWithTimestamp((2, 5L, "Hello world"), 1422L) +ctx.collectWithTimestamp((3, 5L, "Hello world"), 1422L) +ctx.collectWithTimestamp((1, 7L, "Hello world"), 1424L) +ctx.collectWithTimestamp((1, 8L, "Hello world"), 1423L) +ctx.collectWithTimestamp((1, 9L, "Hello world"), 1421L) +ctx.emitWatermark(new Watermark(1430L)) + } + + override def cancel(): Unit = {} +}).toTable(tEnv).as('a, 'b, 'c) + +tEnv.registerTable("T1", t1) + +val result = tEnv.sql(sqlQuery).toDataStream[Row] +result.addSink(new StreamITCase.StringSink) +env.execute() + +val expected = mutable.MutableList( + "1,2,Hello,2,1,2,2,2", + "1,3,Hello world,5,2,2,3,2", + "1,1,Hi,6,3,2,3,1", + "2,1,Hello,1,1,1,1,1", + "2,2,Hello world,3,2,1,2,1", + "3,1,Hello,1,1,1,1,1", + "3,2,Hello world,3,2,1,2,1", + "1,5,Hello world,11,4,2,5,1", + "1,6,Hello world,17,5,3,6,1", + "1,9,Hello world,26,6,4,9,1", + "1,8,Hello world,34,7,4,9,1", + "1,7,Hello world,41,8,5,9,1", + "2,5,Hello world,8,3,2,5,1", + "3,5,Hello world,8,3,2,5,1" +) +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + /** test sliding event-time unbounded window without partitiion by **/ + @Test + def testUnboundedEventTimeRowWindowWithoutPartition(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +env.setStateBackend(getStateBackend) +StreamITCase.testResults = mutable.MutableList() +env.setParallelism(1) + +val sqlQuery = "SELECT a, b, c, " + + "SUM(b) over (order by ro
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r107386829 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ --- End diff -- @hongyuhong, what do you think about using `MapState[Long, List[Row]]` here instead of `ListState[Tuple2[Long, Row]]`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r107387246 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 --- End diff -- Can we change this to `import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}` to indicate that this is not a Scala Tuple2? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r107388771 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", intermediateType) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val tuple2Type: TypeInformation[Tuple2[Long, Row]] = + new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType) +.asInstanceOf[TypeInformation[Tuple2[Long, Row]]] +val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] = + new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", tuple2Type) +rowState = getRuntimeContext.getListState[Tuple2[Long, Row]](tupleStateDescriptor) + + } + + /** +* Process one element from the input stream, not emit the output +* +* @param input The input value. +* @param ctx The ctx to register timer or get current time +* @param out The collector for returning result values. +* +*/ + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer + ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1) + + rowState.add(new Tuple2(ctx.timestamp, input)) +} + } + + /** +* Called when a timer set fires, sort current records according the timestamp +* and emit the output +* +* @param timestamp The timestamp of the firing timer. +* @param ctx The ctx to register timer or get current ti
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r107385110 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala --- @@ -317,4 +320,193 @@ class SqlITCase extends StreamingWithStateTestBase { result.addSink(new StreamITCase.StringSink) env.execute() } + + /** test sliding event-time unbounded window with partition by **/ + @Test + def testUnboundedEventTimeRowWindowWithPartition(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +env.setStateBackend(getStateBackend) +StreamITCase.testResults = mutable.MutableList() +env.setParallelism(1) --- End diff -- The test should also work correctly if we only set the parallelism of the source to 1. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r107387271 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer --- End diff -- this import is unused. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r107062397 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ --- End diff -- I think `ListState` can not work well for event-time case. because we must deal with out of order datas,for example: If we allowedLateness = 2 ( the length of time that the user configures the allowable data delay) InputData: ``` (1L, 1, "Hello"), (2L, 2, "Hello"), **(4L, 4, "Hello"),** // We should handle `4L` and `3L` elements correctly,because **(3L, 3, "Hello"),** //`allowedLateness=2` (7L, 7, "Hello"), (7L, 8, "Hello"), (5L, 5, "Hello"), (8L, 8, "Hello World"), **(20L, 20, "Hello World"),** **(9L, 9, "Hello World"))** // we can ignore `9L`, Because 20L-9L = 11L > 2 ``` So, I suggest that we can use `MapState[Long, List[Row]] ` and `PriorityQueue[(Long, Long)]` to deal with this case. then we should consider two things: 1. Out of order but not late event. 2. add `allowedLateness` config which use can definition. What do you think? @hongyuhong @fhueske --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106950646 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + intermediateType.createSerializer(getRuntimeContext.getExecutionConfig) +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] = + (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType)).createSerializer( + getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, Row]]] +val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] = + new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", tupleSerializer) +rowState = getRuntimeContext.getListState[Tuple2[Long, Row]](tupleStateDescriptor) + + } + + /** +* Process one element from the input stream, not emit the output +* +* @param value The input value. +* @param ctx The ctx to register timer or get current time +* @param out The collector for returning result values. +* +*/ + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer + ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1) + + rowState.add(new Tuple2(ctx.timestamp, input)) +} + } + + /** +* Called when a timer set fires, sort current record
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106950142 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + intermediateType.createSerializer(getRuntimeContext.getExecutionConfig) +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] = + (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType)).createSerializer( + getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, Row]]] +val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] = + new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", tupleSerializer) +rowState = getRuntimeContext.getListState[Tuple2[Long, Row]](tupleStateDescriptor) + + } + + /** +* Process one element from the input stream, not emit the output +* +* @param value The input value. +* @param ctx The ctx to register timer or get current time +* @param out The collector for returning result values. +* +*/ + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer + ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1) + + rowState.add(new Tuple2(ctx.timestamp, input)) +} + } + + /** +* Called when a timer set fires, sort current record
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106949663 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + intermediateType.createSerializer(getRuntimeContext.getExecutionConfig) +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] = + (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType)).createSerializer( + getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, Row]]] +val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] = + new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", tupleSerializer) +rowState = getRuntimeContext.getListState[Tuple2[Long, Row]](tupleStateDescriptor) + + } + + /** +* Process one element from the input stream, not emit the output +* +* @param value The input value. +* @param ctx The ctx to register timer or get current time +* @param out The collector for returning result values. +* +*/ + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer + ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1) --- End diff -- Hi @hongyuhong, I completely agree with you. Very good points! Thanks --- If your project is set up for it, you can reply to this e
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user hongyuhong commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106774581 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + intermediateType.createSerializer(getRuntimeContext.getExecutionConfig) +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] = + (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType)).createSerializer( + getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, Row]]] +val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] = + new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", tupleSerializer) +rowState = getRuntimeContext.getListState[Tuple2[Long, Row]](tupleStateDescriptor) + + } + + /** +* Process one element from the input stream, not emit the output +* +* @param value The input value. +* @param ctx The ctx to register timer or get current time +* @param out The collector for returning result values. +* +*/ + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer + ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1) --- End diff -- I think if we register use ctx.timestamp, then it will generate too much timer, if use currentWatermark + 1, then it will remove
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106623827 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + intermediateType.createSerializer(getRuntimeContext.getExecutionConfig) +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] = + (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType)).createSerializer( + getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, Row]]] +val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] = + new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", tupleSerializer) +rowState = getRuntimeContext.getListState[Tuple2[Long, Row]](tupleStateDescriptor) + + } + + /** +* Process one element from the input stream, not emit the output +* +* @param value The input value. +* @param ctx The ctx to register timer or get current time +* @param out The collector for returning result values. +* +*/ + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer + ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1) + + rowState.add(new Tuple2(ctx.timestamp, input)) +} + } + + /** +* Called when a timer set fires, sort current record
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106622821 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + intermediateType.createSerializer(getRuntimeContext.getExecutionConfig) +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] = + (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType)).createSerializer( + getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, Row]]] +val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] = + new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", tupleSerializer) +rowState = getRuntimeContext.getListState[Tuple2[Long, Row]](tupleStateDescriptor) + + } + + /** +* Process one element from the input stream, not emit the output +* +* @param value The input value. +* @param ctx The ctx to register timer or get current time +* @param out The collector for returning result values. +* +*/ + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer + ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1) + + rowState.add(new Tuple2(ctx.timestamp, input)) +} + } + + /** +* Called when a timer set fires, sort current record
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106621564 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -91,6 +91,35 @@ object AggregateUtil { } /** +* Create an [[ProcessFunction]] to evaluate final aggregate value. +* +* @param namedAggregates List of calls to aggregate functions and their output field names +* @param inputType Input row type +* @return [[UnboundedProcessingOverProcessFunction]] +*/ + private[flink] def CreateUnboundedEventTimeOverProcessFunction( + namedAggregates: Seq[CalcitePair[AggregateCall, String]], + inputType: RelDataType): UnboundedEventTimeOverProcessFunction = { + +val (aggFields, aggregates) = + transformToAggregateFunctions( +namedAggregates.map(_.getKey), +inputType, +needRetraction = false) + +val aggregationStateType: RowTypeInfo = --- End diff -- Also you can use `createAccumulatorRowType(inputType, aggregates)`. Btw. could you refactor the `createAccumulatorRowType(inputType, aggregates)` method and remove the `inputType` parameter? It is not used. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106620612 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -159,6 +167,46 @@ class DataStreamOverAggregate( result } + def createUnboundedAndCurrentRowEventTimeOverWindow( +inputDS: DataStream[Row]): DataStream[Row] = { + +val overWindow: Group = logicWindow.groups.get(0) +val partitionKeys: Array[Int] = overWindow.keys.toArray +val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates + +// get the output types +val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] + +val result: DataStream[Row] = + // partitioned aggregation + if (partitionKeys.nonEmpty) { +val keyedStream = inputDS.keyBy(partitionKeys: _*) +val processFunction = AggregateUtil.CreateUnboundedEventTimeOverProcessFunction( + namedAggregates, + inputType) + +keyedStream + .process(processFunction) + .returns(rowTypeInfo) + .name(aggOpName) + .asInstanceOf[DataStream[Row]] + } + // global non-partitioned aggregation + else { +val processFunction = AggregateUtil.CreateUnboundedEventTimeOverProcessFunction( + namedAggregates, + inputType) + +inputDS.keyBy(new NullByteKeySelector[Row]) + .process(processFunction) + .setParallelism(1) --- End diff -- also `setMaxParallelism(1)` to prevent that this operator can be scaled out. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106625858 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala --- @@ -317,4 +320,119 @@ class SqlITCase extends StreamingWithStateTestBase { result.addSink(new StreamITCase.StringSink) env.execute() } + + /** test sliding event-time unbounded window with partition by **/ + @Test + def testUnboundedEventTimeRowWindowWithPartition(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +env.setStateBackend(getStateBackend) +StreamITCase.testResults = mutable.MutableList() +env.setParallelism(1) + +val sqlQuery = "SELECT a, b, c, " + + "SUM(a) over (" + + "partition by a order by rowtime() range between unbounded preceding and current row), " + + "count(a) over (" + + "partition by a order by rowtime() range between unbounded preceding and current row), " + + "avg(a) over (" + --- End diff -- Also, most groups have just a single record. The max is two records. With that we cannot really check if the sorting works correctly. Can you make less groups (less distinct `a` values) and add more rows for some groups with out-of-order timestamps? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106623601 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + intermediateType.createSerializer(getRuntimeContext.getExecutionConfig) +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] = + (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType)).createSerializer( + getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, Row]]] +val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] = + new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", tupleSerializer) +rowState = getRuntimeContext.getListState[Tuple2[Long, Row]](tupleStateDescriptor) + + } + + /** +* Process one element from the input stream, not emit the output +* +* @param value The input value. +* @param ctx The ctx to register timer or get current time +* @param out The collector for returning result values. +* +*/ + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer + ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1) + + rowState.add(new Tuple2(ctx.timestamp, input)) +} + } + + /** +* Called when a timer set fires, sort current record
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106623012 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + intermediateType.createSerializer(getRuntimeContext.getExecutionConfig) +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] = + (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType)).createSerializer( + getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, Row]]] +val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] = + new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", tupleSerializer) +rowState = getRuntimeContext.getListState[Tuple2[Long, Row]](tupleStateDescriptor) + + } + + /** +* Process one element from the input stream, not emit the output +* +* @param value The input value. +* @param ctx The ctx to register timer or get current time +* @param out The collector for returning result values. +* +*/ + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer + ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1) + + rowState.add(new Tuple2(ctx.timestamp, input)) +} + } + + /** +* Called when a timer set fires, sort current record
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106617971 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -112,7 +113,14 @@ class DataStreamOverAggregate( "condition.") } case _: RowTimeType => -throw new TableException("OVER Window of the EventTime type is not currently supported.") +if (overWindow.lowerBound.isUnbounded && + overWindow.upperBound.isCurrentRow) { --- End diff -- move this condition into the line above? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106624514 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + intermediateType.createSerializer(getRuntimeContext.getExecutionConfig) +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] = + (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType)).createSerializer( + getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, Row]]] +val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] = + new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", tupleSerializer) +rowState = getRuntimeContext.getListState[Tuple2[Long, Row]](tupleStateDescriptor) + + } + + /** +* Process one element from the input stream, not emit the output +* +* @param value The input value. +* @param ctx The ctx to register timer or get current time +* @param out The collector for returning result values. +* +*/ + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer + ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1) + + rowState.add(new Tuple2(ctx.timestamp, input)) +} + } + + /** +* Called when a timer set fires, sort current record
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106626434 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala --- @@ -317,4 +320,119 @@ class SqlITCase extends StreamingWithStateTestBase { result.addSink(new StreamITCase.StringSink) env.execute() } + + /** test sliding event-time unbounded window with partition by **/ + @Test + def testUnboundedEventTimeRowWindowWithPartition(): Unit = { --- End diff -- Can you also add a few unit tests to `org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala` to verify that the query is correctly translated? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106625475 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala --- @@ -317,4 +320,119 @@ class SqlITCase extends StreamingWithStateTestBase { result.addSink(new StreamITCase.StringSink) env.execute() } + + /** test sliding event-time unbounded window with partition by **/ + @Test + def testUnboundedEventTimeRowWindowWithPartition(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +env.setStateBackend(getStateBackend) +StreamITCase.testResults = mutable.MutableList() +env.setParallelism(1) + +val sqlQuery = "SELECT a, b, c, " + + "SUM(a) over (" + + "partition by a order by rowtime() range between unbounded preceding and current row), " + + "count(a) over (" + + "partition by a order by rowtime() range between unbounded preceding and current row), " + + "avg(a) over (" + --- End diff -- Computing `avg`, `max`, `min` on the partition key is not very meaningful. Can you compute those on `b`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106624582 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + intermediateType.createSerializer(getRuntimeContext.getExecutionConfig) +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] = + (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType)).createSerializer( + getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, Row]]] +val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] = + new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", tupleSerializer) +rowState = getRuntimeContext.getListState[Tuple2[Long, Row]](tupleStateDescriptor) + + } + + /** +* Process one element from the input stream, not emit the output +* +* @param value The input value. +* @param ctx The ctx to register timer or get current time +* @param out The collector for returning result values. +* +*/ + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer + ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1) + + rowState.add(new Tuple2(ctx.timestamp, input)) +} + } + + /** +* Called when a timer set fires, sort current record
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106622479 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + intermediateType.createSerializer(getRuntimeContext.getExecutionConfig) +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] = + (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType)).createSerializer( + getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, Row]]] +val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] = + new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", tupleSerializer) +rowState = getRuntimeContext.getListState[Tuple2[Long, Row]](tupleStateDescriptor) + + } + + /** +* Process one element from the input stream, not emit the output +* +* @param value The input value. +* @param ctx The ctx to register timer or get current time +* @param out The collector for returning result values. +* +*/ + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer + ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1) --- End diff -- we should register the timer based on the record timestamp: `ctx.timerService.registerEventTimeTimer(ctx.timestamp + 1)` --- I
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106623704 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + intermediateType.createSerializer(getRuntimeContext.getExecutionConfig) +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] = + (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType)).createSerializer( + getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, Row]]] +val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] = + new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", tupleSerializer) +rowState = getRuntimeContext.getListState[Tuple2[Long, Row]](tupleStateDescriptor) + + } + + /** +* Process one element from the input stream, not emit the output +* +* @param value The input value. +* @param ctx The ctx to register timer or get current time +* @param out The collector for returning result values. +* +*/ + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer + ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1) + + rowState.add(new Tuple2(ctx.timestamp, input)) +} + } + + /** +* Called when a timer set fires, sort current record
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user shijinkui commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106589672 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + intermediateType.createSerializer(getRuntimeContext.getExecutionConfig) +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] = + (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType)).createSerializer( --- End diff -- `(new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType))` can omit the outside brackets. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user shijinkui commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106590214 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -159,6 +167,46 @@ class DataStreamOverAggregate( result } + def createUnboundedAndCurrentRowEventTimeOverWindow( +inputDS: DataStream[Row]): DataStream[Row] = { + +val overWindow: Group = logicWindow.groups.get(0) +val partitionKeys: Array[Int] = overWindow.keys.toArray +val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates + +// get the output types +val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] + +val result: DataStream[Row] = + // partitioned aggregation + if (partitionKeys.nonEmpty) { +val keyedStream = inputDS.keyBy(partitionKeys: _*) +val processFunction = AggregateUtil.CreateUnboundedEventTimeOverProcessFunction( --- End diff -- `val processFunction = AggregateUtil.CreateUnboundedEventTimeOverProcessFunction(` can declared before the if/else, because it have no any relationship with `partitionKeys` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user shijinkui commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106592513 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + intermediateType.createSerializer(getRuntimeContext.getExecutionConfig) +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] = + (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType)).createSerializer( + getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, Row]]] +val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] = + new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", tupleSerializer) +rowState = getRuntimeContext.getListState[Tuple2[Long, Row]](tupleStateDescriptor) + + } + + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer + ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1) + + rowState.add(new Tuple2(ctx.timestamp, input)) +} + } + + override def onTimer( + timestamp: Long, + ctx: ProcessFunction[Row, Row]#OnTimerContext, + out: Collector[Row]): Unit = { + +var rowList = rowState.get.iterator +var sortList = new util.LinkedList[Tuple2[Long, Row]]() +while (rowList.hasNext) { + insertToSortedList(rowList.next, sortList) +
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user shijinkui commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106590692 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + intermediateType.createSerializer(getRuntimeContext.getExecutionConfig) +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] = + (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType)).createSerializer( + getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, Row]]] +val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] = + new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", tupleSerializer) +rowState = getRuntimeContext.getListState[Tuple2[Long, Row]](tupleStateDescriptor) + + } + + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer + ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1) + + rowState.add(new Tuple2(ctx.timestamp, input)) +} + } + + override def onTimer( + timestamp: Long, + ctx: ProcessFunction[Row, Row]#OnTimerContext, + out: Collector[Row]): Unit = { + +var rowList = rowState.get.iterator +var sortList = new util.LinkedList[Tuple2[Long, Row]]() +while (rowList.hasNext) { --- End diff -- `rowList` and `sortList` use `
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user shijinkui commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106590822 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + intermediateType.createSerializer(getRuntimeContext.getExecutionConfig) +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] = + (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType)).createSerializer( + getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, Row]]] +val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] = + new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", tupleSerializer) +rowState = getRuntimeContext.getListState[Tuple2[Long, Row]](tupleStateDescriptor) + + } + + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer + ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1) + + rowState.add(new Tuple2(ctx.timestamp, input)) +} + } + + override def onTimer( + timestamp: Long, + ctx: ProcessFunction[Row, Row]#OnTimerContext, + out: Collector[Row]): Unit = { + +var rowList = rowState.get.iterator +var sortList = new util.LinkedList[Tuple2[Long, Row]]() +while (rowList.hasNext) { + insertToSortedList(rowList.next, sortList) +
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user shijinkui commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106590304 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -91,6 +91,35 @@ object AggregateUtil { } /** +* Create an [[ProcessFunction]] to evaluate final aggregate value. +* +* @param namedAggregates List of calls to aggregate functions and their output field names +* @param inputType Input row type +* @return [[UnboundedProcessingOverProcessFunction]] +*/ + private[flink] def CreateUnboundedEventTimeOverProcessFunction( --- End diff -- function name should start with lowercase --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user shijinkui commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106590402 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -91,6 +91,35 @@ object AggregateUtil { } /** +* Create an [[ProcessFunction]] to evaluate final aggregate value. +* +* @param namedAggregates List of calls to aggregate functions and their output field names +* @param inputType Input row type +* @return [[UnboundedProcessingOverProcessFunction]] +*/ + private[flink] def CreateUnboundedEventTimeOverProcessFunction( + namedAggregates: Seq[CalcitePair[AggregateCall, String]], + inputType: RelDataType): UnboundedEventTimeOverProcessFunction = { + +val (aggFields, aggregates) = + transformToAggregateFunctions( +namedAggregates.map(_.getKey), +inputType, +needRetraction = false) + +val aggregationStateType: RowTypeInfo = --- End diff -- ``` val aggregationStateType: RowTypeInfo = createDataSetAggregateBufferDataType(Array(), aggregates, inputType) ``` This will be more readable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user shijinkui commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106592195 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.java.tuple.Tuple2 +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param intermediateType the intermediate row tye which the state saved + * @param inputType the input row tye which the state saved + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val intermediateType: TypeInformation[Row], +private val inputType: TypeInformation[Row]) + extends ProcessFunction[Row, Row]{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowState: ListState[Tuple2[Long, Row]] = _ + + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + intermediateType.createSerializer(getRuntimeContext.getExecutionConfig) +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer) +accumulatorState = getRuntimeContext.getState[Row](stateDescriptor) + +val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] = + (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, inputType)).createSerializer( + getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, Row]]] +val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] = + new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", tupleSerializer) +rowState = getRuntimeContext.getListState[Tuple2[Long, Row]](tupleStateDescriptor) + + } + + override def processElement( + input: Row, + ctx: ProcessFunction[Row, Row]#Context, + out: Collector[Row]): Unit = { + +// discard later record +if (ctx.timestamp() >= ctx.timerService().currentWatermark()) { + // ensure every key just register on timer + ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1) + + rowState.add(new Tuple2(ctx.timestamp, input)) +} + } + + override def onTimer( + timestamp: Long, + ctx: ProcessFunction[Row, Row]#OnTimerContext, + out: Collector[Row]): Unit = { + +var rowList = rowState.get.iterator +var sortList = new util.LinkedList[Tuple2[Long, Row]]() +while (rowList.hasNext) { + insertToSortedList(rowList.next, sortList) +
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user shijinkui commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106411203 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UnboundedRowtimeOverTest.scala --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream.sql + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.table.api.{TableEnvironment, TableException} +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.scala.stream.utils.StreamTestData.Small4Tuple +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit._ + +import scala.collection.mutable + +class UnboundedRowtimeOverTest extends StreamingWithStateTestBase { --- End diff -- @hongyuhong all the scala UT or IT file name should end with `ITCase` or `Suite` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106412030 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.util + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.{ProcessFunction} +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.base.StringSerializer +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper} +import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext} +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + +import scala.collection.mutable.ArrayBuffer + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param interMediateType the intermediate row tye which the state saved + * @param keySelector the keyselector + * @param keyType the key type + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val interMediateType: TypeInformation[Row], +private val keySelector: KeySelector[Row, Tuple], +private val keyType: TypeInformation[Tuple]) + extends ProcessFunction[Row, Row] + with CheckpointedFunction{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var state: MapState[TimeWindow, Row] = _ + private val aggregateWithIndex: Array[(AggregateFunction[_], Int)] = aggregates.zipWithIndex + + /** Sorted list per key for choose the recent result and the records need retraction **/ + private val timeSectionsMap: java.util.HashMap[Tuple, java.util.LinkedList[TimeWindow]] = --- End diff -- We can of course keep a branch with these changes around. However, I'd also like to add a non-retract version of for event-time OVER UNBOUNDED windows. We haven't started with the work on retractions yet. So it is not sure that we will make it for the 1.3 release. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user rtudoran commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106409879 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.util + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.{ProcessFunction} +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.base.StringSerializer +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper} +import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext} +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + +import scala.collection.mutable.ArrayBuffer + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param interMediateType the intermediate row tye which the state saved + * @param keySelector the keyselector + * @param keyType the key type + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val interMediateType: TypeInformation[Row], +private val keySelector: KeySelector[Row, Tuple], +private val keyType: TypeInformation[Tuple]) + extends ProcessFunction[Row, Row] + with CheckpointedFunction{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var state: MapState[TimeWindow, Row] = _ + private val aggregateWithIndex: Array[(AggregateFunction[_], Int)] = aggregates.zipWithIndex + + /** Sorted list per key for choose the recent result and the records need retraction **/ + private val timeSectionsMap: java.util.HashMap[Tuple, java.util.LinkedList[TimeWindow]] = --- End diff -- @fhueske - I understand - do you think we should put this in a branch and use it as a model to update the retraction for the others? It might be mixed with the design document proposed for retraction on the mailing list. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106386147 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.util + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.{ProcessFunction} +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.base.StringSerializer +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper} +import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext} +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + +import scala.collection.mutable.ArrayBuffer + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param interMediateType the intermediate row tye which the state saved + * @param keySelector the keyselector + * @param keyType the key type + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val interMediateType: TypeInformation[Row], +private val keySelector: KeySelector[Row, Tuple], +private val keyType: TypeInformation[Tuple]) + extends ProcessFunction[Row, Row] + with CheckpointedFunction{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var state: MapState[TimeWindow, Row] = _ + private val aggregateWithIndex: Array[(AggregateFunction[_], Int)] = aggregates.zipWithIndex + + /** Sorted list per key for choose the recent result and the records need retraction **/ + private val timeSectionsMap: java.util.HashMap[Tuple, java.util.LinkedList[TimeWindow]] = +new java.util.HashMap[Tuple, java.util.LinkedList[TimeWindow]] + + /** For store timeSectionsMap **/ + private var timeSectionsState: ListState[String] = _ + private var inputKeySerializer: TypeSerializer[Tuple] = _ + private var timeSerializer: TypeSerializer[TimeWindow] = _ + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + interMediateType.createSerializer(getRuntimeContext.getExecutionConfig) +timeSerializer = new TimeWindow.Serializer +val stateDescriptor: MapStateDescriptor[TimeWindow, Row] = + new MapStateDescriptor[TimeWindow, Row]("rowtimeoverstate", timeSerializer, valueSerializer) +inputKeySerializer = keyType.createSerializer(getRuntimeContext.getExecutionConfig) +state = getRuntimeContext.getMapState[TimeWindow, Row](stateDescriptor) + } + + override def processEleme
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user hongyuhong commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106373065 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.util + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.{ProcessFunction} +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.base.StringSerializer +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper} +import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext} +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + +import scala.collection.mutable.ArrayBuffer + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param interMediateType the intermediate row tye which the state saved + * @param keySelector the keyselector + * @param keyType the key type + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val interMediateType: TypeInformation[Row], +private val keySelector: KeySelector[Row, Tuple], +private val keyType: TypeInformation[Tuple]) + extends ProcessFunction[Row, Row] + with CheckpointedFunction{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var state: MapState[TimeWindow, Row] = _ + private val aggregateWithIndex: Array[(AggregateFunction[_], Int)] = aggregates.zipWithIndex + + /** Sorted list per key for choose the recent result and the records need retraction **/ + private val timeSectionsMap: java.util.HashMap[Tuple, java.util.LinkedList[TimeWindow]] = +new java.util.HashMap[Tuple, java.util.LinkedList[TimeWindow]] + + /** For store timeSectionsMap **/ + private var timeSectionsState: ListState[String] = _ + private var inputKeySerializer: TypeSerializer[Tuple] = _ + private var timeSerializer: TypeSerializer[TimeWindow] = _ + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + interMediateType.createSerializer(getRuntimeContext.getExecutionConfig) +timeSerializer = new TimeWindow.Serializer +val stateDescriptor: MapStateDescriptor[TimeWindow, Row] = + new MapStateDescriptor[TimeWindow, Row]("rowtimeoverstate", timeSerializer, valueSerializer) +inputKeySerializer = keyType.createSerializer(getRuntimeContext.getExecutionConfig) +state = getRuntimeContext.getMapState[TimeWindow, Row](stateDescriptor) + } + + override def processEl
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106366395 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.util + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.{ProcessFunction} +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.base.StringSerializer +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper} +import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext} +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + +import scala.collection.mutable.ArrayBuffer + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param interMediateType the intermediate row tye which the state saved + * @param keySelector the keyselector + * @param keyType the key type + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val interMediateType: TypeInformation[Row], +private val keySelector: KeySelector[Row, Tuple], +private val keyType: TypeInformation[Tuple]) + extends ProcessFunction[Row, Row] + with CheckpointedFunction{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var state: MapState[TimeWindow, Row] = _ + private val aggregateWithIndex: Array[(AggregateFunction[_], Int)] = aggregates.zipWithIndex + + /** Sorted list per key for choose the recent result and the records need retraction **/ + private val timeSectionsMap: java.util.HashMap[Tuple, java.util.LinkedList[TimeWindow]] = --- End diff -- Adding retraction support is a much bigger issue. If we add support for this operator and merge it to master, the master branch is in an inconsistent state because some operators support it and other don't. We must ensure that adding retraction support is an "atomic" operation, i.e., either we add support for all operators or none. I will not be in the situation that the work is half (or 80%) done when the 1.3 release branch is forked off on May 1st. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user rtudoran commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106363331 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.util + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.{ProcessFunction} +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.base.StringSerializer +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper} +import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext} +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + +import scala.collection.mutable.ArrayBuffer + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param interMediateType the intermediate row tye which the state saved + * @param keySelector the keyselector + * @param keyType the key type + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val interMediateType: TypeInformation[Row], +private val keySelector: KeySelector[Row, Tuple], +private val keyType: TypeInformation[Tuple]) + extends ProcessFunction[Row, Row] + with CheckpointedFunction{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var state: MapState[TimeWindow, Row] = _ + private val aggregateWithIndex: Array[(AggregateFunction[_], Int)] = aggregates.zipWithIndex + + /** Sorted list per key for choose the recent result and the records need retraction **/ + private val timeSectionsMap: java.util.HashMap[Tuple, java.util.LinkedList[TimeWindow]] = --- End diff -- @fhueske - I am confused about this reply. In the previous discussions on the mailing list as well in the reply on the discussion about supporting retraction for Flink Streaming - the idea was to build such a feature. you said "IMO, we must make sure that either all operators support retraction or none" - why shouldn't we use this as a blue print on how to build the retraction for the others? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user rtudoran commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106363300 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.util + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.{ProcessFunction} +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.base.StringSerializer +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper} +import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext} +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + +import scala.collection.mutable.ArrayBuffer + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param interMediateType the intermediate row tye which the state saved + * @param keySelector the keyselector + * @param keyType the key type + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val interMediateType: TypeInformation[Row], +private val keySelector: KeySelector[Row, Tuple], +private val keyType: TypeInformation[Tuple]) + extends ProcessFunction[Row, Row] + with CheckpointedFunction{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var state: MapState[TimeWindow, Row] = _ + private val aggregateWithIndex: Array[(AggregateFunction[_], Int)] = aggregates.zipWithIndex + + /** Sorted list per key for choose the recent result and the records need retraction **/ + private val timeSectionsMap: java.util.HashMap[Tuple, java.util.LinkedList[TimeWindow]] = --- End diff -- @fhueske - I am confused about this reply. In the previous discussions on the mailing list as well in the reply on the discussion about supporting retraction for Flink Streaming - the idea was to build such a feature. you said "IMO, we must make sure that either all operators support retraction or none" - why shouldn't we use this as a blue print on how to build the retraction for the others? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106359002 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.util + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.{ProcessFunction} +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.base.StringSerializer +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper} +import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext} +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + +import scala.collection.mutable.ArrayBuffer + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param interMediateType the intermediate row tye which the state saved + * @param keySelector the keyselector + * @param keyType the key type + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val interMediateType: TypeInformation[Row], +private val keySelector: KeySelector[Row, Tuple], +private val keyType: TypeInformation[Tuple]) + extends ProcessFunction[Row, Row] + with CheckpointedFunction{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var state: MapState[TimeWindow, Row] = _ + private val aggregateWithIndex: Array[(AggregateFunction[_], Int)] = aggregates.zipWithIndex + + /** Sorted list per key for choose the recent result and the records need retraction **/ + private val timeSectionsMap: java.util.HashMap[Tuple, java.util.LinkedList[TimeWindow]] = +new java.util.HashMap[Tuple, java.util.LinkedList[TimeWindow]] + + /** For store timeSectionsMap **/ + private var timeSectionsState: ListState[String] = _ + private var inputKeySerializer: TypeSerializer[Tuple] = _ + private var timeSerializer: TypeSerializer[TimeWindow] = _ + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + interMediateType.createSerializer(getRuntimeContext.getExecutionConfig) +timeSerializer = new TimeWindow.Serializer +val stateDescriptor: MapStateDescriptor[TimeWindow, Row] = + new MapStateDescriptor[TimeWindow, Row]("rowtimeoverstate", timeSerializer, valueSerializer) +inputKeySerializer = keyType.createSerializer(getRuntimeContext.getExecutionConfig) +state = getRuntimeContext.getMapState[TimeWindow, Row](stateDescriptor) + } + + override def processEleme
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user hongyuhong commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106324953 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.util + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.{ProcessFunction} +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.base.StringSerializer +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper} +import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext} +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + +import scala.collection.mutable.ArrayBuffer + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param interMediateType the intermediate row tye which the state saved + * @param keySelector the keyselector + * @param keyType the key type + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val interMediateType: TypeInformation[Row], +private val keySelector: KeySelector[Row, Tuple], +private val keyType: TypeInformation[Tuple]) + extends ProcessFunction[Row, Row] + with CheckpointedFunction{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var state: MapState[TimeWindow, Row] = _ + private val aggregateWithIndex: Array[(AggregateFunction[_], Int)] = aggregates.zipWithIndex + + /** Sorted list per key for choose the recent result and the records need retraction **/ + private val timeSectionsMap: java.util.HashMap[Tuple, java.util.LinkedList[TimeWindow]] = +new java.util.HashMap[Tuple, java.util.LinkedList[TimeWindow]] + + /** For store timeSectionsMap **/ + private var timeSectionsState: ListState[String] = _ + private var inputKeySerializer: TypeSerializer[Tuple] = _ + private var timeSerializer: TypeSerializer[TimeWindow] = _ + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + interMediateType.createSerializer(getRuntimeContext.getExecutionConfig) +timeSerializer = new TimeWindow.Serializer +val stateDescriptor: MapStateDescriptor[TimeWindow, Row] = + new MapStateDescriptor[TimeWindow, Row]("rowtimeoverstate", timeSerializer, valueSerializer) +inputKeySerializer = keyType.createSerializer(getRuntimeContext.getExecutionConfig) +state = getRuntimeContext.getMapState[TimeWindow, Row](stateDescriptor) + } + + override def processEl
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106220993 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.util + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.{ProcessFunction} +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.base.StringSerializer +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper} +import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext} +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + +import scala.collection.mutable.ArrayBuffer + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param interMediateType the intermediate row tye which the state saved + * @param keySelector the keyselector + * @param keyType the key type + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val interMediateType: TypeInformation[Row], +private val keySelector: KeySelector[Row, Tuple], +private val keyType: TypeInformation[Tuple]) + extends ProcessFunction[Row, Row] + with CheckpointedFunction{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var state: MapState[TimeWindow, Row] = _ + private val aggregateWithIndex: Array[(AggregateFunction[_], Int)] = aggregates.zipWithIndex + + /** Sorted list per key for choose the recent result and the records need retraction **/ + private val timeSectionsMap: java.util.HashMap[Tuple, java.util.LinkedList[TimeWindow]] = +new java.util.HashMap[Tuple, java.util.LinkedList[TimeWindow]] + + /** For store timeSectionsMap **/ + private var timeSectionsState: ListState[String] = _ + private var inputKeySerializer: TypeSerializer[Tuple] = _ + private var timeSerializer: TypeSerializer[TimeWindow] = _ + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + interMediateType.createSerializer(getRuntimeContext.getExecutionConfig) +timeSerializer = new TimeWindow.Serializer +val stateDescriptor: MapStateDescriptor[TimeWindow, Row] = + new MapStateDescriptor[TimeWindow, Row]("rowtimeoverstate", timeSerializer, valueSerializer) +inputKeySerializer = keyType.createSerializer(getRuntimeContext.getExecutionConfig) +state = getRuntimeContext.getMapState[TimeWindow, Row](stateDescriptor) + } + + override def proce
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106203455 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/UnboundedEventTimeOverProcessFuncTest.java --- @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.api.java.stream.sql; + + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.streaming.api.operators.KeyedProcessOperator; +import org.apache.flink.streaming.api.operators.ProcessOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.table.functions.AggregateFunction; +import org.apache.flink.table.functions.aggfunctions.IntSumAggFunction; +import org.apache.flink.table.runtime.aggregate.UnboundedEventTimeOverProcessFunction; +import org.apache.flink.types.Row; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import java.util.concurrent.ConcurrentLinkedQueue; + + +/** + * Tests {@link ProcessOperator}. + */ +public class UnboundedEventTimeOverProcessFuncTest extends TestLogger { --- End diff -- Please implement the test in Scala --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106183250 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.util + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.{ProcessFunction} +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.base.StringSerializer +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper} +import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext} +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + +import scala.collection.mutable.ArrayBuffer + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param interMediateType the intermediate row tye which the state saved + * @param keySelector the keyselector + * @param keyType the key type + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val interMediateType: TypeInformation[Row], +private val keySelector: KeySelector[Row, Tuple], +private val keyType: TypeInformation[Tuple]) + extends ProcessFunction[Row, Row] + with CheckpointedFunction{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var state: MapState[TimeWindow, Row] = _ + private val aggregateWithIndex: Array[(AggregateFunction[_], Int)] = aggregates.zipWithIndex + + /** Sorted list per key for choose the recent result and the records need retraction **/ + private val timeSectionsMap: java.util.HashMap[Tuple, java.util.LinkedList[TimeWindow]] = +new java.util.HashMap[Tuple, java.util.LinkedList[TimeWindow]] + + /** For store timeSectionsMap **/ + private var timeSectionsState: ListState[String] = _ + private var inputKeySerializer: TypeSerializer[Tuple] = _ + private var timeSerializer: TypeSerializer[TimeWindow] = _ + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + interMediateType.createSerializer(getRuntimeContext.getExecutionConfig) +timeSerializer = new TimeWindow.Serializer +val stateDescriptor: MapStateDescriptor[TimeWindow, Row] = + new MapStateDescriptor[TimeWindow, Row]("rowtimeoverstate", timeSerializer, valueSerializer) +inputKeySerializer = keyType.createSerializer(getRuntimeContext.getExecutionConfig) +state = getRuntimeContext.getMapState[TimeWindow, Row](stateDescriptor) + } + + override def processEleme
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106195270 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.util + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.{ProcessFunction} +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.base.StringSerializer +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper} +import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext} +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + +import scala.collection.mutable.ArrayBuffer + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param interMediateType the intermediate row tye which the state saved + * @param keySelector the keyselector + * @param keyType the key type + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val interMediateType: TypeInformation[Row], +private val keySelector: KeySelector[Row, Tuple], +private val keyType: TypeInformation[Tuple]) + extends ProcessFunction[Row, Row] + with CheckpointedFunction{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var state: MapState[TimeWindow, Row] = _ + private val aggregateWithIndex: Array[(AggregateFunction[_], Int)] = aggregates.zipWithIndex + + /** Sorted list per key for choose the recent result and the records need retraction **/ + private val timeSectionsMap: java.util.HashMap[Tuple, java.util.LinkedList[TimeWindow]] = +new java.util.HashMap[Tuple, java.util.LinkedList[TimeWindow]] + + /** For store timeSectionsMap **/ + private var timeSectionsState: ListState[String] = _ + private var inputKeySerializer: TypeSerializer[Tuple] = _ + private var timeSerializer: TypeSerializer[TimeWindow] = _ + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + interMediateType.createSerializer(getRuntimeContext.getExecutionConfig) +timeSerializer = new TimeWindow.Serializer +val stateDescriptor: MapStateDescriptor[TimeWindow, Row] = + new MapStateDescriptor[TimeWindow, Row]("rowtimeoverstate", timeSerializer, valueSerializer) +inputKeySerializer = keyType.createSerializer(getRuntimeContext.getExecutionConfig) +state = getRuntimeContext.getMapState[TimeWindow, Row](stateDescriptor) + } + + override def processEleme
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106188790 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.util + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.{ProcessFunction} +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.base.StringSerializer +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper} +import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext} +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + +import scala.collection.mutable.ArrayBuffer + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param interMediateType the intermediate row tye which the state saved + * @param keySelector the keyselector + * @param keyType the key type + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val interMediateType: TypeInformation[Row], +private val keySelector: KeySelector[Row, Tuple], +private val keyType: TypeInformation[Tuple]) + extends ProcessFunction[Row, Row] + with CheckpointedFunction{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var state: MapState[TimeWindow, Row] = _ + private val aggregateWithIndex: Array[(AggregateFunction[_], Int)] = aggregates.zipWithIndex + + /** Sorted list per key for choose the recent result and the records need retraction **/ + private val timeSectionsMap: java.util.HashMap[Tuple, java.util.LinkedList[TimeWindow]] = +new java.util.HashMap[Tuple, java.util.LinkedList[TimeWindow]] + + /** For store timeSectionsMap **/ + private var timeSectionsState: ListState[String] = _ + private var inputKeySerializer: TypeSerializer[Tuple] = _ + private var timeSerializer: TypeSerializer[TimeWindow] = _ + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + interMediateType.createSerializer(getRuntimeContext.getExecutionConfig) +timeSerializer = new TimeWindow.Serializer +val stateDescriptor: MapStateDescriptor[TimeWindow, Row] = + new MapStateDescriptor[TimeWindow, Row]("rowtimeoverstate", timeSerializer, valueSerializer) +inputKeySerializer = keyType.createSerializer(getRuntimeContext.getExecutionConfig) +state = getRuntimeContext.getMapState[TimeWindow, Row](stateDescriptor) + } + + override def processEleme
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106207575 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UnboundedRowtimeOverTest.scala --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream.sql + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.table.api.{TableEnvironment, TableException} +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.scala.stream.utils.StreamTestData.Small4Tuple +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit._ + +import scala.collection.mutable + +class UnboundedRowtimeOverTest extends StreamingWithStateTestBase { --- End diff -- Can you add the test methods to `org.apache.flink.table.api.scala.stream.sql.SqlITCase.scala`? Each test class that extends `StreamingWithStateTestBase` adds overhead because it starts a Flink mini cluster which takes a bit of time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106184455 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.util + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.{ProcessFunction} +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.base.StringSerializer +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper} +import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext} +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + +import scala.collection.mutable.ArrayBuffer + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param interMediateType the intermediate row tye which the state saved + * @param keySelector the keyselector + * @param keyType the key type + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val interMediateType: TypeInformation[Row], +private val keySelector: KeySelector[Row, Tuple], +private val keyType: TypeInformation[Tuple]) + extends ProcessFunction[Row, Row] + with CheckpointedFunction{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var state: MapState[TimeWindow, Row] = _ + private val aggregateWithIndex: Array[(AggregateFunction[_], Int)] = aggregates.zipWithIndex --- End diff -- We use `while` loops to iterate over the AggregateFunctions. So we do not need to `zipWithIndex`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106190096 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.util + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.{ProcessFunction} +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.base.StringSerializer +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper} +import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext} +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + +import scala.collection.mutable.ArrayBuffer + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param interMediateType the intermediate row tye which the state saved + * @param keySelector the keyselector + * @param keyType the key type + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val interMediateType: TypeInformation[Row], +private val keySelector: KeySelector[Row, Tuple], +private val keyType: TypeInformation[Tuple]) + extends ProcessFunction[Row, Row] + with CheckpointedFunction{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var state: MapState[TimeWindow, Row] = _ + private val aggregateWithIndex: Array[(AggregateFunction[_], Int)] = aggregates.zipWithIndex + + /** Sorted list per key for choose the recent result and the records need retraction **/ + private val timeSectionsMap: java.util.HashMap[Tuple, java.util.LinkedList[TimeWindow]] = +new java.util.HashMap[Tuple, java.util.LinkedList[TimeWindow]] + + /** For store timeSectionsMap **/ + private var timeSectionsState: ListState[String] = _ + private var inputKeySerializer: TypeSerializer[Tuple] = _ + private var timeSerializer: TypeSerializer[TimeWindow] = _ + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + interMediateType.createSerializer(getRuntimeContext.getExecutionConfig) +timeSerializer = new TimeWindow.Serializer +val stateDescriptor: MapStateDescriptor[TimeWindow, Row] = + new MapStateDescriptor[TimeWindow, Row]("rowtimeoverstate", timeSerializer, valueSerializer) +inputKeySerializer = keyType.createSerializer(getRuntimeContext.getExecutionConfig) +state = getRuntimeContext.getMapState[TimeWindow, Row](stateDescriptor) + } + + override def processEleme
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106186862 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.util + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.{ProcessFunction} +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.base.StringSerializer +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper} +import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext} +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + +import scala.collection.mutable.ArrayBuffer + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param interMediateType the intermediate row tye which the state saved + * @param keySelector the keyselector + * @param keyType the key type + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val interMediateType: TypeInformation[Row], +private val keySelector: KeySelector[Row, Tuple], +private val keyType: TypeInformation[Tuple]) + extends ProcessFunction[Row, Row] + with CheckpointedFunction{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var state: MapState[TimeWindow, Row] = _ + private val aggregateWithIndex: Array[(AggregateFunction[_], Int)] = aggregates.zipWithIndex + + /** Sorted list per key for choose the recent result and the records need retraction **/ + private val timeSectionsMap: java.util.HashMap[Tuple, java.util.LinkedList[TimeWindow]] = --- End diff -- The current state of the Table API does not know how to handle retractions. So we can send every result just once and never correct it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106203989 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/UnboundedEventTimeOverProcessFuncTest.java --- @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.api.java.stream.sql; + + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.streaming.api.operators.KeyedProcessOperator; +import org.apache.flink.streaming.api.operators.ProcessOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.table.functions.AggregateFunction; +import org.apache.flink.table.functions.aggfunctions.IntSumAggFunction; +import org.apache.flink.table.runtime.aggregate.UnboundedEventTimeOverProcessFunction; +import org.apache.flink.types.Row; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import java.util.concurrent.ConcurrentLinkedQueue; + + +/** + * Tests {@link ProcessOperator}. + */ +public class UnboundedEventTimeOverProcessFuncTest extends TestLogger { + + @Test + public void testUnboundedEventSnapshotAndRestore() throws Exception { + + AggregateFunction[] aggFunc = new AggregateFunction[1]; + aggFunc[0] = new IntSumAggFunction(); + int[] aggField = new int[1]; + aggField[0] = 0; + + TypeInformation returnType = new RowTypeInfo(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO); + TypeInformation interMediateType = new RowTypeInfo(BasicTypeInfo.INT_TYPE_INFO, aggFunc[0].getAccumulatorType()); --- End diff -- `interMediate` -> `intermediate` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106210445 --- Diff: flink-libraries/flink-table/pom.xml --- @@ -124,6 +124,21 @@ under the License. ${project.version} test + --- End diff -- add newline --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106203953 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/UnboundedEventTimeOverProcessFuncTest.java --- @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.api.java.stream.sql; + + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.streaming.api.operators.KeyedProcessOperator; +import org.apache.flink.streaming.api.operators.ProcessOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.table.functions.AggregateFunction; +import org.apache.flink.table.functions.aggfunctions.IntSumAggFunction; +import org.apache.flink.table.runtime.aggregate.UnboundedEventTimeOverProcessFunction; +import org.apache.flink.types.Row; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import java.util.concurrent.ConcurrentLinkedQueue; + + +/** + * Tests {@link ProcessOperator}. + */ +public class UnboundedEventTimeOverProcessFuncTest extends TestLogger { + + @Test + public void testUnboundedEventSnapshotAndRestore() throws Exception { + + AggregateFunction[] aggFunc = new AggregateFunction[1]; + aggFunc[0] = new IntSumAggFunction(); + int[] aggField = new int[1]; + aggField[0] = 0; + + TypeInformation returnType = new RowTypeInfo(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO); + TypeInformation interMediateType = new RowTypeInfo(BasicTypeInfo.INT_TYPE_INFO, aggFunc[0].getAccumulatorType()); + KeySelector keyselector = new KeySelector() { + @Override + public Tuple getKey(Row value) throws Exception { + return new Tuple1(1); + } + }; + + KeyedProcessOperator operator = + new KeyedProcessOperator<>( + new UnboundedEventTimeOverProcessFunction( + aggFunc, aggField, 1, interMediateType, keyselector, + new TupleTypeInfo(BasicTypeInfo.INT_TYPE_INFO))); + + OneInputStreamOperatorTestHarness testHarness = + new KeyedOneInputStreamOperatorTestHarness<>( + operator, keyselector, new TupleTypeInfo(BasicTypeInfo.INT_TYPE_INFO)); + + testHarness.setup(); + testHarness.open(); + + Row inputRow = new Row(1); + inputRow.setField(0, 1); + testHarness.processElement(new StreamRecord<>(inputRow, 12L)); + testHarness.processElement(new StreamRecord<>(inputRow, 12L)); + testHarness.processElement(new StreamRecord<>(inputRow, 12L)); + + ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue<>(); + Row res = new Row(2); --- End diff -- Use `Row.of()` to generate rows more concisely. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106181911 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.util + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.{ProcessFunction} +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.base.StringSerializer +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper} +import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext} +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + +import scala.collection.mutable.ArrayBuffer + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param interMediateType the intermediate row tye which the state saved + * @param keySelector the keyselector + * @param keyType the key type + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val interMediateType: TypeInformation[Row], --- End diff -- `intermediate` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106201295 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -91,6 +92,38 @@ object AggregateUtil { } /** +* Create an [[ProcessFunction]] to evaluate final aggregate value. +* +* @param namedAggregates List of calls to aggregate functions and their output field names +* @param inputType Input row type +* @return [[UnboundedProcessingOverProcessFunction]] +*/ + private[flink] def CreateUnboundedEventTimeOverProcessFunction( + namedAggregates: Seq[CalcitePair[AggregateCall, String]], + inputType: RelDataType, + keySelector: KeySelector[Row, Tuple], + keyType: TypeInformation[Tuple]): UnboundedEventTimeOverProcessFunction = { + +val (aggFields, aggregates) = + transformToAggregateFunctions( +namedAggregates.map(_.getKey), +inputType, +needRetraction = false) + +val aggregationStateType: RowTypeInfo = + createDataSetAggregateBufferDataType( +(0 until inputType.getFieldCount).toArray, aggregates, inputType) + +new UnboundedEventTimeOverProcessFunction( + aggregates, + aggFields, + inputType.getFieldCount, + aggregationStateType, + keySelector, --- End diff -- We don't need the `keySelector` and `keyType` if we follow the design I suggest below. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106206858 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UnboundedRowtimeOverTest.scala --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream.sql + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.table.api.{TableEnvironment, TableException} +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.scala.stream.utils.StreamTestData.Small4Tuple +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit._ + +import scala.collection.mutable + +class UnboundedRowtimeOverTest extends StreamingWithStateTestBase { --- End diff -- Can you add the test methods to `org.apache.flink.table.api.scala.stream.sql.SqlITCase.scala`? Each test class that extends `StreamingWithStateTestBase` adds overhead because it starts a Flink mini cluster which takes a bit of time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r106190499 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala --- @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.util + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.{ProcessFunction} +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.base.StringSerializer +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper} +import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext} +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + +import scala.collection.mutable.ArrayBuffer + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param interMediateType the intermediate row tye which the state saved + * @param keySelector the keyselector + * @param keyType the key type + * + */ +class UnboundedEventTimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val interMediateType: TypeInformation[Row], +private val keySelector: KeySelector[Row, Tuple], +private val keyType: TypeInformation[Tuple]) + extends ProcessFunction[Row, Row] + with CheckpointedFunction{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var state: MapState[TimeWindow, Row] = _ + private val aggregateWithIndex: Array[(AggregateFunction[_], Int)] = aggregates.zipWithIndex + + /** Sorted list per key for choose the recent result and the records need retraction **/ + private val timeSectionsMap: java.util.HashMap[Tuple, java.util.LinkedList[TimeWindow]] = +new java.util.HashMap[Tuple, java.util.LinkedList[TimeWindow]] + + /** For store timeSectionsMap **/ + private var timeSectionsState: ListState[String] = _ + private var inputKeySerializer: TypeSerializer[Tuple] = _ + private var timeSerializer: TypeSerializer[TimeWindow] = _ + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + interMediateType.createSerializer(getRuntimeContext.getExecutionConfig) +timeSerializer = new TimeWindow.Serializer +val stateDescriptor: MapStateDescriptor[TimeWindow, Row] = + new MapStateDescriptor[TimeWindow, Row]("rowtimeoverstate", timeSerializer, valueSerializer) +inputKeySerializer = keyType.createSerializer(getRuntimeContext.getExecutionConfig) +state = getRuntimeContext.getMapState[TimeWindow, Row](stateDescriptor) + } + + override def processEleme
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user hongyuhong commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r105337481 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventtimeOverProcessFunction.scala --- @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.util + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.{ProcessFunction} +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.base.StringSerializer +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper} +import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext} +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + +import scala.collection.mutable.ArrayBuffer + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param interMediateType the intermediate row tye which the state saved + * @param keySelector the keyselector + * @param keyType the key type + * + */ +class UnboundedEventtimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val interMediateType: TypeInformation[Row], +private val keySelector: KeySelector[Row, Tuple], +private val keyType: TypeInformation[Tuple]) + extends ProcessFunction[Row, Row] + with CheckpointedFunction{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var state: MapState[TimeWindow, Row] = _ + private val aggregateWithIndex: Array[(AggregateFunction[_], Int)] = aggregates.zipWithIndex + + /** Sorted list per key for choose the recent result and the records need retraction **/ + private val timeSectionsMap: java.util.HashMap[Tuple, java.util.LinkedList[TimeWindow]] = --- End diff -- Yes, there may be some memory problem if the watermark interval is too long, but if use statebackend, the cost of serialize whole list everytime when update is too large, like you say, if there can be a sorting statebackend that can provide quikly search and update, that's great for me. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r105323567 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventtimeOverProcessFunction.scala --- @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.util + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.{ProcessFunction} +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.base.StringSerializer +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper} +import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext} +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + +import scala.collection.mutable.ArrayBuffer + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param interMediateType the intermediate row tye which the state saved + * @param keySelector the keyselector + * @param keyType the key type + * + */ +class UnboundedEventtimeOverProcessFunction( --- End diff -- I suggest that change `Eventtime` to `EventTime`, What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r105322751 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UnboundedRowtimeOverTest.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream.sql + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.table.api.{TableEnvironment, TableException} +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.scala.stream.utils.StreamTestData.Small4Tuple +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit._ + +import scala.collection.mutable + +class UnboundedRowtimeOverTest extends StreamingWithStateTestBase { + + /** test sliding event-time unbounded window with partition by **/ + @Test + def testWithPartition(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +StreamITCase.testResults = mutable.MutableList() + +val sqlQuery = "SELECT a, b, SUM(a) over (partition by b order by rowtime() range between " + --- End diff -- I suggest that partitionBy `c` filed. Just a suggestion. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r105322914 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UnboundedRowtimeOverTest.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream.sql + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.table.api.{TableEnvironment, TableException} +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.scala.stream.utils.StreamTestData.Small4Tuple +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit._ + +import scala.collection.mutable + +class UnboundedRowtimeOverTest extends StreamingWithStateTestBase { + + /** test sliding event-time unbounded window with partition by **/ + @Test + def testWithPartition(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +StreamITCase.testResults = mutable.MutableList() + +val sqlQuery = "SELECT a, b, SUM(a) over (partition by b order by rowtime() range between " + + "unbounded preceding and current row) from T1" + +val t1 = StreamTestData.getSmall3TupleDataStream(env) + .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(Int, Long, String)] { + +def getCurrentWatermark: Watermark = new Watermark(130L) + +def extractTimestamp(element: (Int, Long, String), previousElementTimestamp: Long): Long = + 140 + }).toTable(tEnv).as('a, 'b, 'c) +tEnv.registerTable("T1", t1) + +val result = tEnv.sql(sqlQuery).toDataStream[Row] +result.addSink(new StreamITCase.StringSink) +env.execute() + +val expected1 = mutable.MutableList( + "1,1,1", "2,2,2", "3,2,5") +val expected2 = mutable.MutableList( + "1,1,1", "2,2,5", "3,2,3") +assertTrue(expected1.equals(StreamITCase.testResults.sorted) || + expected2.equals(StreamITCase.testResults.sorted)) + } + + /** test sliding event-time unbounded window without partitiion by **/ + @Test + def testWithoutPartition(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +StreamITCase.testResults = mutable.MutableList() + +val sqlQuery = "SELECT SUM(a) " + + "over (order by rowtime() range between unbounded preceding and current row) from T1" + +val t1 = StreamTestData.getSmall3TupleDataStream(env) + .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(Int, Long, String)] { + +def getCurrentWatermark: Watermark = new Watermark(130L) + +def extractTimestamp(element: (Int, Long, String), previousElementTimestamp: Long): Long = + 140 + }).toTable(tEnv).as('a, 'b, 'c) +tEnv.registerTable("T1", t1) + +val result = tEnv.sql(sqlQuery).toDataStream[Row] +result.addSink(new StreamITCase.StringSink) +env.execute() + +assertEquals(Some("6"), StreamITCase.testResults.sorted.get(StreamITCase.testResults.size - 1)) --- End diff -- Can you test the results of each output? --- If your project is set up for it, you can
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r105322654 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UnboundedRowtimeOverTest.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream.sql + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.table.api.{TableEnvironment, TableException} +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.scala.stream.utils.StreamTestData.Small4Tuple +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit._ + +import scala.collection.mutable + +class UnboundedRowtimeOverTest extends StreamingWithStateTestBase { + + /** test sliding event-time unbounded window with partition by **/ + @Test + def testWithPartition(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +StreamITCase.testResults = mutable.MutableList() + +val sqlQuery = "SELECT a, b, SUM(a) over (partition by b order by rowtime() range between " + + "unbounded preceding and current row) from T1" + +val t1 = StreamTestData.getSmall3TupleDataStream(env) --- End diff -- This test data is a bit simple, I recommend enriching some test data, such as: ``` data.+=((1, 1L, "Hi")) data.+=((2, 2L, "Hello")) data.+=((3, 5L, "Hello")) data.+=((1, 3L, "Hello")) data.+=((3, 7L, "Hello world")) data.+=((4, 9L, "Hello world")) data.+=((5, 8L, "Hello world")) ``` what do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r105324505 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventtimeOverProcessFunction.scala --- @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.util + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.{ProcessFunction} +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.base.StringSerializer +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper} +import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext} +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + +import scala.collection.mutable.ArrayBuffer + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param interMediateType the intermediate row tye which the state saved + * @param keySelector the keyselector + * @param keyType the key type + * + */ +class UnboundedEventtimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val interMediateType: TypeInformation[Row], +private val keySelector: KeySelector[Row, Tuple], +private val keyType: TypeInformation[Tuple]) + extends ProcessFunction[Row, Row] + with CheckpointedFunction{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var state: MapState[TimeWindow, Row] = _ + private val aggregateWithIndex: Array[(AggregateFunction[_], Int)] = aggregates.zipWithIndex + + /** Sorted list per key for choose the recent result and the records need retraction **/ + private val timeSectionsMap: java.util.HashMap[Tuple, java.util.LinkedList[TimeWindow]] = +new java.util.HashMap[Tuple, java.util.LinkedList[TimeWindow]] + + /** For store timeSectionsMap **/ + private var timeSectionsState: ListState[String] = _ + private var inputKeySerializer: TypeSerializer[Tuple] = _ + private var timeSerializer: TypeSerializer[TimeWindow] = _ + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) +val valueSerializer: TypeSerializer[Row] = + interMediateType.createSerializer(getRuntimeContext.getExecutionConfig) +timeSerializer = new TimeWindow.Serializer +val stateDescriptor: MapStateDescriptor[TimeWindow, Row] = + new MapStateDescriptor[TimeWindow, Row]("rowtimeoverstate", timeSerializer, valueSerializer) +inputKeySerializer = keyType.createSerializer(getRuntimeContext.getExecutionConfig) +state = getRuntimeContext.getMapState[TimeWindow, Row](stateDescriptor) + } + + override def proce
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r105322866 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UnboundedRowtimeOverTest.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream.sql + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.table.api.{TableEnvironment, TableException} +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.scala.stream.utils.StreamTestData.Small4Tuple +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit._ + +import scala.collection.mutable + +class UnboundedRowtimeOverTest extends StreamingWithStateTestBase { + + /** test sliding event-time unbounded window with partition by **/ + @Test + def testWithPartition(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +StreamITCase.testResults = mutable.MutableList() + +val sqlQuery = "SELECT a, b, SUM(a) over (partition by b order by rowtime() range between " + + "unbounded preceding and current row) from T1" + +val t1 = StreamTestData.getSmall3TupleDataStream(env) + .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(Int, Long, String)] { + +def getCurrentWatermark: Watermark = new Watermark(130L) --- End diff -- Why use a fixed value to produce watermark, can generate watermark based on data? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r105325003 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventtimeOverProcessFunction.scala --- @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import java.util + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row +import org.apache.flink.streaming.api.functions.{ProcessFunction} +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state._ +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.typeutils.base.StringSerializer +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper} +import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext} +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction +import org.apache.flink.streaming.api.operators.TimestampedCollector +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} + +import scala.collection.mutable.ArrayBuffer + +/** + * A ProcessFunction to support unbounded event-time over-window + * + * @param aggregates the aggregate functions + * @param aggFields the filed index which the aggregate functions use + * @param forwardedFieldCount the input fields count + * @param interMediateType the intermediate row tye which the state saved + * @param keySelector the keyselector + * @param keyType the key type + * + */ +class UnboundedEventtimeOverProcessFunction( +private val aggregates: Array[AggregateFunction[_]], +private val aggFields: Array[Int], +private val forwardedFieldCount: Int, +private val interMediateType: TypeInformation[Row], +private val keySelector: KeySelector[Row, Tuple], +private val keyType: TypeInformation[Tuple]) + extends ProcessFunction[Row, Row] + with CheckpointedFunction{ + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var state: MapState[TimeWindow, Row] = _ + private val aggregateWithIndex: Array[(AggregateFunction[_], Int)] = aggregates.zipWithIndex + + /** Sorted list per key for choose the recent result and the records need retraction **/ + private val timeSectionsMap: java.util.HashMap[Tuple, java.util.LinkedList[TimeWindow]] = --- End diff -- It's not a good idea to use a memory data structure here because i worry about OOM problems in big data situations, I suggest use stateBackend. Unfortunately we are not currently sorting stateBackend, maybe we can think about other ways. I'm not sure, but I need to think about it and then give you feedback. what do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r105323083 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UnboundedRowtimeOverTest.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream.sql + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.table.api.{TableEnvironment, TableException} +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.scala.stream.utils.StreamTestData.Small4Tuple +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit._ + +import scala.collection.mutable + +class UnboundedRowtimeOverTest extends StreamingWithStateTestBase { + + /** test sliding event-time unbounded window with partition by **/ + @Test + def testWithPartition(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +StreamITCase.testResults = mutable.MutableList() + +val sqlQuery = "SELECT a, b, SUM(a) over (partition by b order by rowtime() range between " + + "unbounded preceding and current row) from T1" + +val t1 = StreamTestData.getSmall3TupleDataStream(env) + .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(Int, Long, String)] { + +def getCurrentWatermark: Watermark = new Watermark(130L) + +def extractTimestamp(element: (Int, Long, String), previousElementTimestamp: Long): Long = + 140 + }).toTable(tEnv).as('a, 'b, 'c) +tEnv.registerTable("T1", t1) + +val result = tEnv.sql(sqlQuery).toDataStream[Row] +result.addSink(new StreamITCase.StringSink) +env.execute() + +val expected1 = mutable.MutableList( + "1,1,1", "2,2,2", "3,2,5") +val expected2 = mutable.MutableList( + "1,1,1", "2,2,5", "3,2,3") +assertTrue(expected1.equals(StreamITCase.testResults.sorted) || + expected2.equals(StreamITCase.testResults.sorted)) + } + + /** test sliding event-time unbounded window without partitiion by **/ + @Test + def testWithoutPartition(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +StreamITCase.testResults = mutable.MutableList() + +val sqlQuery = "SELECT SUM(a) " + + "over (order by rowtime() range between unbounded preceding and current row) from T1" + +val t1 = StreamTestData.getSmall3TupleDataStream(env) + .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(Int, Long, String)] { + +def getCurrentWatermark: Watermark = new Watermark(130L) + +def extractTimestamp(element: (Int, Long, String), previousElementTimestamp: Long): Long = + 140 + }).toTable(tEnv).as('a, 'b, 'c) +tEnv.registerTable("T1", t1) + +val result = tEnv.sql(sqlQuery).toDataStream[Row] +result.addSink(new StreamITCase.StringSink) +env.execute() + +assertEquals(Some("6"), StreamITCase.testResults.sorted.get(StreamITCase.testResults.size - 1)) + } + + /** test sliding event-time unbounded window with later record **/ + @Test + def test
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r105323410 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -80,6 +81,38 @@ object AggregateUtil { } /** +* Create an [[ProcessFunction]] to evaluate final aggregate value. +* +* @param namedAggregates List of calls to aggregate functions and their output field names +* @param inputType Input row type +* @return [[UnboundedProcessingOverProcessFunction]] +*/ + private[flink] def CreateUnboundedEventtimeOverProcessFunction( --- End diff -- I suggest that change `Eventtime` to `EventTime`, What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r103404131 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSlideEventTimeRowAgg.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.AggregateCall +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.types.Row +import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory} +import FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.runtime.aggregate.AggregateUtil._ +import org.apache.flink.table.runtime.aggregate._ +import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, WindowedStream} +import org.apache.flink.streaming.api.windowing.assigners._ +import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow} +import org.apache.flink.table.api.StreamTableEnvironment +import org.apache.flink.table.plan.nodes.CommonAggregate + +class DataStreamSlideEventTimeRowAgg( +namedProperties: Seq[NamedWindowProperty], +cluster: RelOptCluster, +traitSet: RelTraitSet, +inputNode: RelNode, +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +rowRelDataType: RelDataType, +inputType: RelDataType, +grouping: Array[Int]) + extends SingleRel(cluster, traitSet, inputNode) + with CommonAggregate + with DataStreamRel { + + override def deriveRowType(): RelDataType = rowRelDataType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataStreamSlideEventTimeRowAgg( + namedProperties, + cluster, + traitSet, + inputs.get(0), + namedAggregates, + getRowType, + inputType, + grouping) --- End diff -- In fact, we can discard late events, but we must have a strategy to define what kind of element is late. Of course, the current implementation is also a strategy to assess the delay event, but this strategy will lose too many events, and data calculation results are unpredictable, not playback. This is unacceptable in the production situation. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user hongyuhong commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r103125615 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSlideEventTimeRowAgg.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.AggregateCall +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.types.Row +import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory} +import FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.runtime.aggregate.AggregateUtil._ +import org.apache.flink.table.runtime.aggregate._ +import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, WindowedStream} +import org.apache.flink.streaming.api.windowing.assigners._ +import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow} +import org.apache.flink.table.api.StreamTableEnvironment +import org.apache.flink.table.plan.nodes.CommonAggregate + +class DataStreamSlideEventTimeRowAgg( +namedProperties: Seq[NamedWindowProperty], +cluster: RelOptCluster, +traitSet: RelTraitSet, +inputNode: RelNode, +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +rowRelDataType: RelDataType, +inputType: RelDataType, +grouping: Array[Int]) + extends SingleRel(cluster, traitSet, inputNode) + with CommonAggregate + with DataStreamRel { + + override def deriveRowType(): RelDataType = rowRelDataType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataStreamSlideEventTimeRowAgg( + namedProperties, + cluster, + traitSet, + inputs.get(0), + namedAggregates, + getRowType, + inputType, + grouping) --- End diff -- Yes, we don' know which element will come first. Thus i think we should use the retraction solution like we discuss before. ``` 140 -> (140, 1) 145-> (145, 3) 130-> (130, 3)ï¼ (140, 4), (145, 6) 1400010-> (1400010, 10) ``` And we just persist the aggregation status of every element for one watermark period, when watermark come, the following record which timestamp before the watermark will be discarded. And the problem is one element may generate multiple output in different time, like the example, the first element will output one when it arrive, and will ouput another again when the third element come, it need the following operator or user to do an update operation. What's your opinion? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r102928695 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateReduceFunction.scala --- @@ -52,9 +53,14 @@ class IncrementalAggregateReduceFunction( // and directly merge value1 and value2. val accumulatorRow = new Row(intermediateRowArity) -// copy all fields of value1 into accumulatorRow -(0 until intermediateRowArity) -.foreach(i => accumulatorRow.setField(i, value1.getField(i))) +// copy non agg fields of value2 into accumulatorRow +(0 until aggOffset) + .foreach(i => accumulatorRow.setField(i, value2.getField(i))) + +// copy agg fields of value1 into accumulatorRow --- End diff -- I mean in the `groupWindow` we only need to one times traverse assignment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r102927451 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamSlideEventTimeRowAgg.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.AggregateCall +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.types.Row +import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory} +import FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.runtime.aggregate.AggregateUtil._ +import org.apache.flink.table.runtime.aggregate._ +import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, WindowedStream} +import org.apache.flink.streaming.api.windowing.assigners._ +import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow} +import org.apache.flink.table.api.StreamTableEnvironment +import org.apache.flink.table.plan.nodes.CommonAggregate + +class DataStreamSlideEventTimeRowAgg( +namedProperties: Seq[NamedWindowProperty], +cluster: RelOptCluster, +traitSet: RelTraitSet, +inputNode: RelNode, +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +rowRelDataType: RelDataType, +inputType: RelDataType, +grouping: Array[Int]) + extends SingleRel(cluster, traitSet, inputNode) + with CommonAggregate + with DataStreamRel { + + override def deriveRowType(): RelDataType = rowRelDataType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataStreamSlideEventTimeRowAgg( + namedProperties, + cluster, + traitSet, + inputs.get(0), + namedAggregates, + getRowType, + inputType, + grouping) --- End diff -- I think `I check whether the current data is out of order in WindowOperator isLate function, and now just discard if islate.` will not work well. Because the logic in this method is: ``` if (windowAssigner instanceof GlobalEventTimeRowWindowAssigner) { return windowAssignerContext.getCurrentElementTime() < windowAssignerContext.getCurrentMaxTime(); } ``` e.g. Test Data: ``` 1, 1L, "Hi", 140L 2, 2L, "Hello", 145L 3, 2L, "Hello w", 130 4, 3L, "Hello world", 1400010L ``` You do not know which element first comes, so you will get different results every time you run itï¼Just likeï¼ `SELECT` d, SUM(a) over (order by rowtime() range between unbounded preceding and current row) from T1` You can get the following resultsï¼ The first time: ``` 145,2 1400010,6 ``` The second time ``` 140,1 1400010,5 ``` The third time ``` 130,3 145,5 1400010,9 ``` Soï¼IMHO. Event-time over must handle the situation above. How do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3386#discussion_r102925177 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -563,7 +590,14 @@ private void emitWindowContents(W window, ACC contents) throws Exception { * of the given window. */ protected boolean isLate(W window) { - return (windowAssigner.isEventTime() && (cleanupTime(window) <= internalTimerService.currentWatermark())); + if (windowAssigner.isEventTime()) { + if (windowAssigner instanceof GlobalEventTimeRowWindowAssigner) { + return windowAssignerContext.getCurrentElementTime() < windowAssignerContext.getCurrentMaxTime(); --- End diff -- I think this logic does not work very well if the data arrives is out of order. How do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---