[GitHub] flink pull request #3783: [FLINK-6388] Add support for DISTINCT into Code Ge...
Github user stefanobortoli commented on a diff in the pull request: https://github.com/apache/flink/pull/3783#discussion_r125677038 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -296,6 +299,39 @@ class CodeGenerator( fields.mkString(", ") } +def genInitialize(): String = { + + val sig: String = +j""" + | public void initialize( + |org.apache.flink.api.common.functions.RuntimeContext ctx + | )""".stripMargin + + val initDist: String = if( distinctAggsFlags.isDefined ) { +val statePackage = "org.apache.flink.api.common.state" +val distAggsFlags = distinctAggsFlags.get + for(i <- distAggsFlags.indices) yield +if(distAggsFlags(i)) { + val typeString = javaTypes(aggFields(i)(0)) --- End diff -- actually, why shouldn't we use directly rows? is there any specific reason to prefer tuple? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3783: [FLINK-6388] Add support for DISTINCT into Code Generated...
Github user stefanobortoli commented on the issue: https://github.com/apache/flink/pull/3783 @fhueske thanks for the comments. Did we include the latest calcite already? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3783: [FLINK-6388] Add support for DISTINCT into Code Ge...
Github user stefanobortoli commented on a diff in the pull request: https://github.com/apache/flink/pull/3783#discussion_r125291954 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala --- @@ -20,16 +20,22 @@ package org.apache.flink.table.runtime.aggregate import org.apache.flink.api.common.functions.Function import org.apache.flink.types.Row +import org.apache.flink.api.common.functions.RuntimeContext /** * Base class for code-generated aggregations. */ abstract class GeneratedAggregations extends Function { + + /** +* Initialize the state for the distinct aggregation check +* +* @param ctx the runtime context to retrieve and initialize the distinct states +*/ + def initialize(ctx: RuntimeContext) /** -* Sets the results of the aggregations (partial or final) to the output row. --- End diff -- I think it was some formatting issue --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3783: [FLINK-6388] Add support for DISTINCT into Code Ge...
Github user stefanobortoli commented on a diff in the pull request: https://github.com/apache/flink/pull/3783#discussion_r125291702 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -296,6 +299,39 @@ class CodeGenerator( fields.mkString(", ") } +def genInitialize(): String = { + + val sig: String = +j""" + | public void initialize( + |org.apache.flink.api.common.functions.RuntimeContext ctx + | )""".stripMargin + + val initDist: String = if( distinctAggsFlags.isDefined ) { +val statePackage = "org.apache.flink.api.common.state" +val distAggsFlags = distinctAggsFlags.get + for(i <- distAggsFlags.indices) yield +if(distAggsFlags(i)) { + val typeString = javaTypes(aggFields(i)(0)) --- End diff -- sounds good --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3771: [FLINK-6250] Distinct procTime with Rows boundarie...
Github user stefanobortoli closed the pull request at: https://github.com/apache/flink/pull/3771 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3732: [FLINK-6250] Distinct procTime with Rows boundaries
Github user stefanobortoli commented on the issue: https://github.com/apache/flink/pull/3732 @fhueske @sunjincheng121 @shijinkui @hongyuhong I have created a PR with the latest master with the code generated distinct, #3771 please have a look. If we it is fine, we can basically support distinct for all the window types --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3771: [FLINK-6250] Distinct procTime with Rows boundaries
Github user stefanobortoli commented on the issue: https://github.com/apache/flink/pull/3771 @fhueske @rtudoran @shijinkui @sunjincheng121 I have create a new PR for distinct in the code generator. Please have a look and let me know. I have implemented and tested only for OverProcTimeRowBounded window, but if you like it I can quickly implement and test also the others. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3771: [FLINK-6250] Distinct procTime with Rows boundarie...
GitHub user stefanobortoli opened a pull request: https://github.com/apache/flink/pull/3771 [FLINK-6250] Distinct procTime with Rows boundaries Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [X ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ X] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ X] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/stefanobortoli/flink FLINK-6250b Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3771.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3771 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3732: [FLINK-6250] Distinct procTime with Rows boundaries
Github user stefanobortoli commented on the issue: https://github.com/apache/flink/pull/3732 @fhueske I have just pushed a version working with code generation (without modifying the code generation) There will be the need for some refactoring in the AggregateUtil function, but if the overall concept is sound, I will fix things. @hongyuhong , @shijinkui you could also have a look if you have time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...
Github user stefanobortoli commented on a diff in the pull request: https://github.com/apache/flink/pull/3732#discussion_r112725482 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UnsupportedOperatorsIndicatorFunctions.scala --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.functions + +import java.nio.charset.Charset +import java.util.List + +import org.apache.calcite.rel.`type`._ +import org.apache.calcite.sql._ +import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes, SqlTypeFamily, SqlTypeName} +import org.apache.calcite.sql.validate.SqlMonotonicity +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo +import org.apache.flink.table.api.TableException +import org.apache.flink.table.expressions.LeafExpression +import org.apache.calcite.sql.`type`.InferTypes +import org.apache.calcite.sql.validate.SqlValidator +import org.apache.calcite.sql.validate.SqlValidatorScope + +/** + * An SQL Function DISTINCT() used to mark the DISTINCT operator + * on aggregation input. This is temporary workaround waiting for + * https://issues.apache.org/jira/browse/CALCITE-1740 being solved + */ +object DistinctAggregatorExtractor extends SqlFunction("DIST", SqlKind.OTHER_FUNCTION, + ReturnTypes.ARG0, InferTypes.RETURN_TYPE, + OperandTypes.NUMERIC, SqlFunctionCategory.NUMERIC) { --- End diff -- one never ends to learn. :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...
Github user stefanobortoli commented on a diff in the pull request: https://github.com/apache/flink/pull/3732#discussion_r112725820 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -91,6 +93,22 @@ class DataStreamOverAggregate( val overWindow: org.apache.calcite.rel.core.Window.Group = logicWindow.groups.get(0) +val distinctVarMap: Map[String,Boolean] = new HashMap[String, Boolean] --- End diff -- This is a good point. The string trick is anyway a temporary workaround. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3732: [FLINK-6250] Distinct procTime with Rows boundaries
Github user stefanobortoli commented on the issue: https://github.com/apache/flink/pull/3732 @fhueske, I agree with you about the risk of temporary DIST() ingestion. Perhaps we could meanwhile just work on the "ProcessFunction + Code generation" keeping the DIST function for test purposes tests. My concern is that the code my change again and all the work would just be wasted. To be honest, the code generation is quite new to me, and I will have to learn to work on that. Meanwhile, I have almost completed a version that relies on current code generation, nesting the distinct logic. As it is almost done, I will share this one as well and then if necessary move to the code generation. what do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...
Github user stefanobortoli commented on a diff in the pull request: https://github.com/apache/flink/pull/3732#discussion_r112713096 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala --- @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.types.Row +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.{List => JList} + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.api.common.state.ListState + +class ProcTimeBoundedDistinctRowsOver( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Array[Int]], + private val distinctAggsFlag: Array[Boolean], + private val precedingOffset: Long, + private val forwardedFieldCount: Int, + private val aggregatesTypeInfo: RowTypeInfo, + private val inputType: TypeInformation[Row]) +extends ProcessFunction[Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkNotNull(distinctAggsFlag) + Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length) + Preconditions.checkArgument(aggregates.length == aggFields.length) + Preconditions.checkArgument(precedingOffset > 0) + + private var accumulatorState: ValueState[Row] = _ + private var rowMapState: MapState[Long, JList[Row]] = _ + private var output: Row = _ + private var counterState: ValueState[Long] = _ + private var smallestTsState: ValueState[Long] = _ + private var distinctValueStateList: Array[MapState[Any, Long]] = _ + + override def open(config: Configuration) { + +output = new Row(forwardedFieldCount + aggregates.length) +// We keep the elements received in a Map state keyed +// by the ingestion time in the operator. +// we also keep counter of processed elements +// and timestamp of oldest element +val rowListTypeInfo: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]] + +val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo) +rowMapState = getRuntimeContext.getMapState(mapStateDescriptor) + +val aggregationStateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo) +accumulatorState = getRuntimeContext.getState(aggregationStateDescriptor) + +val processedCountDescriptor : ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("processedCountState", classOf[Long]) +counterState = getRuntimeContext.getState(processedCountDescriptor) + +val smallestTimestampDescriptor : ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("smallestTSState", classOf[Long]) +smallestTsState = getRuntimeContext.getState(smallestTimestampDescriptor) +distinctValueStateList = new Array(aggregates.size) +f
[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...
Github user stefanobortoli commented on a diff in the pull request: https://github.com/apache/flink/pull/3732#discussion_r112712717 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala --- @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.types.Row +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.{List => JList} + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.api.common.state.ListState + +class ProcTimeBoundedDistinctRowsOver( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Array[Int]], + private val distinctAggsFlag: Array[Boolean], + private val precedingOffset: Long, + private val forwardedFieldCount: Int, + private val aggregatesTypeInfo: RowTypeInfo, + private val inputType: TypeInformation[Row]) +extends ProcessFunction[Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkNotNull(distinctAggsFlag) + Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length) + Preconditions.checkArgument(aggregates.length == aggFields.length) + Preconditions.checkArgument(precedingOffset > 0) + + private var accumulatorState: ValueState[Row] = _ + private var rowMapState: MapState[Long, JList[Row]] = _ + private var output: Row = _ + private var counterState: ValueState[Long] = _ + private var smallestTsState: ValueState[Long] = _ + private var distinctValueStateList: Array[MapState[Any, Long]] = _ + + override def open(config: Configuration) { + +output = new Row(forwardedFieldCount + aggregates.length) +// We keep the elements received in a Map state keyed +// by the ingestion time in the operator. +// we also keep counter of processed elements +// and timestamp of oldest element +val rowListTypeInfo: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]] + +val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo) +rowMapState = getRuntimeContext.getMapState(mapStateDescriptor) + +val aggregationStateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo) +accumulatorState = getRuntimeContext.getState(aggregationStateDescriptor) + +val processedCountDescriptor : ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("processedCountState", classOf[Long]) +counterState = getRuntimeContext.getState(processedCountDescriptor) + +val smallestTimestampDescriptor : ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("smallestTSState", classOf[Long]) +smallestTsState = getRuntimeContext.getState(smallestTimestampDescriptor) +distinctValueStateList = new Array(aggregates.size) +f
[GitHub] flink issue #3574: [FLINK-5653] Add processing time OVER ROWS BETWEEN x PREC...
Github user stefanobortoli commented on the issue: https://github.com/apache/flink/pull/3574 @fhueske @sunjincheng121 the latest merge caused a conflict, however I have already pushed a new branch and created a new PR. This can be closed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3653: [FLINK-5653] Add processing time OVER ROWS BETWEEN x PREC...
Github user stefanobortoli commented on the issue: https://github.com/apache/flink/pull/3653 @fhueske @sunjincheng121 I have merged with the most recent branch, using the function freshly merged. I think the PR is good to merge now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3653: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
GitHub user stefanobortoli opened a pull request: https://github.com/apache/flink/pull/3653 [FLINK-5653] Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [X] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ X] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ X] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/huawei-flink/flink FLINK-5653d Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3653.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3653 commit 5ca561b0a3cec68e9386286eb445275ba9b4ce77 Author: Stefano Bortoli <s.bort...@gmail.com> Date: 2017-03-30T09:28:41Z Over aggregation with row range ang procTime semantic --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3574: [FLINK-5653] Add processing time OVER ROWS BETWEEN x PREC...
Github user stefanobortoli commented on the issue: https://github.com/apache/flink/pull/3574 @fhueske @sunjincheng121 @rtudoran sorry for the extra commit, I saw some comments too little too late. Now the code should comply with the requested changes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3574: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user stefanobortoli commented on a diff in the pull request: https://github.com/apache/flink/pull/3574#discussion_r108703268 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRowProcessFunction.scala --- @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.types.Row +import org.apache.flink.util.{ Collector, Preconditions } +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.table.functions.{ Accumulator, AggregateFunction } +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.{ ArrayList, LinkedList, List => JList } +import org.apache.flink.api.common.typeinfo.BasicTypeInfo + +class BoundedProcessingOverRowProcessFunction( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Int], + private val precedingOffset: Int, + private val forwardedFieldCount: Int, + private val aggregatesTypeInfo: RowTypeInfo, + private val inputType: TypeInformation[Row]) +extends ProcessFunction[Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + Preconditions.checkArgument(precedingOffset > 0) + + private var accumulators: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowMapState: MapState[Long, JList[Row]] = _ + private var output: Row = _ + private var counterState: ValueState[Long] = _ + private var counter : Long = _ + private var smallestTsState: ValueState[Long] = _ + private var smallestTs : Long = _ + + override def open(config: Configuration) { + +output = new Row(forwardedFieldCount + aggregates.length) +// We keep the elements received in a list state +// together with the ingestion time in the operator +// we also keep counter of processed elements +// and timestamp of oldest element +val rowListTypeInfo: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]] + +val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo) + +rowMapState = getRuntimeContext.getMapState(mapStateDescriptor) + +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo) + +accumulatorState = getRuntimeContext.getState(stateDescriptor) + +val processedCountDescriptor : ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("processedCountState", classOf[Long]) + +counterState = getRuntimeContext.getState(processedCountDescriptor) + +val smallesTimestampDescriptor : ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("smallesTSState", classOf[Long]) + +smallestTsState = getRuntimeContext.getState(smallesTimestampDescriptor) + + } + + override def processElement( +input: Row, +ctx: ProcessFunction[Row, Row]#Context, +out: Collector[Row]): Unit = { + +val currentTi
[GitHub] flink pull request #3574: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user stefanobortoli commented on a diff in the pull request: https://github.com/apache/flink/pull/3574#discussion_r108626444 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala --- @@ -350,4 +350,59 @@ class WindowAggregateTest extends TableTestBase { streamUtil.verifySql(sql, expected) } + @Test + def testBoundedNonPartitionedProcessingWindowWithRow() = { +val sql = "SELECT " + + "c, " + + "count(a) OVER (ORDER BY procTime() ROWS BETWEEN 2 preceding AND " + + "CURRENT ROW) as cnt1 " + + "from MyTable" + +val expected = + unaryNode( +"DataStreamCalc", +unaryNode( + "DataStreamOverAggregate", + unaryNode( +"DataStreamCalc", +streamTableNode(0), +term("select", "a", "c", "PROCTIME() AS $2") + ), + term("orderBy", "PROCTIME"), + term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"), + term("select", "a", "c", "PROCTIME", "COUNT(a) AS w0$o0") +), +term("select", "c", "w0$o0 AS $1") + ) +streamUtil.verifySql(sql, expected) + } + + @Test + def testBoundedPartitionedProcessingWindowWithRow() = { +val sql = "SELECT " + + "c, " + + "count(a) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 preceding AND " + --- End diff -- I have tested 4 situations that Fabian thought to be sufficient for our purpose. 3 or 4 does not change besides my work to manually assemble the test. :-) of course unless the aggregations are not reliable and summing 3 numbers or 4 numbers could have an impact. :-D --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3574: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user stefanobortoli commented on a diff in the pull request: https://github.com/apache/flink/pull/3574#discussion_r108625887 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRowProcessFunction.scala --- @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.types.Row +import org.apache.flink.util.{ Collector, Preconditions } +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.table.functions.{ Accumulator, AggregateFunction } +import scala.collection.mutable.Queue +import org.apache.flink.api.common.state.ListStateDescriptor +import org.apache.flink.api.common.state.ListState +import org.apache.flink.api.common.typeinfo.TypeHint +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.{ ArrayList, LinkedList, List => JList } +import org.apache.flink.api.common.typeinfo.BasicTypeInfo + +class BoundedProcessingOverRowProcessFunction( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Int], + private val bufferSize: Int, + private val forwardedFieldCount: Int, + private val aggregatesTypeInfo: RowTypeInfo, + private val inputType: TypeInformation[Row]) +extends ProcessFunction[Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + Preconditions.checkArgument(bufferSize > 0) + + private var accumulators: Row = _ + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowMapState: MapState[Long, JList[Row]] = _ + + override def open(config: Configuration) { + +output = new Row(forwardedFieldCount + aggregates.length) + // We keep the elements received in a list state +// together with the ingestion time in the operator +val rowListTypeInfo: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]] + +val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo) + +rowMapState = getRuntimeContext.getMapState(mapStateDescriptor) + +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo) + +accumulatorState = getRuntimeContext.getState(stateDescriptor) + } + + override def processElement( +input: Row, +ctx: ProcessFunction[Row, Row]#Context, +out: Collector[Row]): Unit = { + +val currentTime = ctx.timerService().currentProcessingTime() +var i = 0 + +var accumulators = accumulatorState.value() +// initialize state for the first processed element +if(accumulators == null){ + accumulators = new Row(aggregates.length) + while (i < aggregates.length) { +accumulators.setField(i, aggregates(i).createAccumulator()) +i += 1 + } +} + +val keyIter = rowMapState.keys.iterator +var oldestTimeStamp = currentTime +var toRetract: JList[Row] = null +var currentKeyTime: Long = 0L
[GitHub] flink pull request #3574: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user stefanobortoli commented on a diff in the pull request: https://github.com/apache/flink/pull/3574#discussion_r108611315 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -195,6 +194,45 @@ class DataStreamOverAggregate( result } + def createBoundedAndCurrentRowProcessingTimeOverWindow( +inputDS: DataStream[Row]): DataStream[Row] = { + +val overWindow: Group = logicWindow.groups.get(0) +val partitionKeys: Array[Int] = overWindow.keys.toArray +val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates + +// get the output types +val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] + +// window size is lowerbound +1 to comply with over semantics +val lowerbound: Int = AggregateUtil.getLowerBoundary( --- End diff -- sure --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3574: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user stefanobortoli commented on a diff in the pull request: https://github.com/apache/flink/pull/3574#discussion_r108611259 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala --- @@ -696,6 +696,181 @@ class SqlITCase extends StreamingWithStateTestBase { "6,8,Hello world,51,9,5,9,1") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + + // + // START TESTING BOUNDED PROC TIME ROW AGGREGATION + // + + + @Test + def testSumMinAggregatation2(): Unit = { --- End diff -- @fhueske suggested the 4 test, and 4 I implemented. It is the 5th time I implement the tests, we can leave it like that. :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3574: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user stefanobortoli commented on a diff in the pull request: https://github.com/apache/flink/pull/3574#discussion_r108610311 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala --- @@ -350,4 +350,59 @@ class WindowAggregateTest extends TableTestBase { streamUtil.verifySql(sql, expected) } + @Test + def testBoundedNonPartitionedProcessingWindowWithRow() = { +val sql = "SELECT " + + "c, " + + "count(a) OVER (ORDER BY procTime() ROWS BETWEEN 2 preceding AND " + + "CURRENT ROW) as cnt1 " + + "from MyTable" + +val expected = + unaryNode( +"DataStreamCalc", +unaryNode( + "DataStreamOverAggregate", + unaryNode( +"DataStreamCalc", +streamTableNode(0), +term("select", "a", "c", "PROCTIME() AS $2") + ), + term("orderBy", "PROCTIME"), + term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"), + term("select", "a", "c", "PROCTIME", "COUNT(a) AS w0$o0") +), +term("select", "c", "w0$o0 AS $1") + ) +streamUtil.verifySql(sql, expected) + } + + @Test + def testBoundedPartitionedProcessingWindowWithRow() = { +val sql = "SELECT " + + "c, " + + "count(a) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 preceding AND " + --- End diff -- why? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3574: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user stefanobortoli commented on a diff in the pull request: https://github.com/apache/flink/pull/3574#discussion_r108610149 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRowProcessFunction.scala --- @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.types.Row +import org.apache.flink.util.{ Collector, Preconditions } +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.table.functions.{ Accumulator, AggregateFunction } +import scala.collection.mutable.Queue +import org.apache.flink.api.common.state.ListStateDescriptor +import org.apache.flink.api.common.state.ListState +import org.apache.flink.api.common.typeinfo.TypeHint +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.{ ArrayList, LinkedList, List => JList } +import org.apache.flink.api.common.typeinfo.BasicTypeInfo + +class BoundedProcessingOverRowProcessFunction( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Int], + private val bufferSize: Int, + private val forwardedFieldCount: Int, + private val aggregatesTypeInfo: RowTypeInfo, + private val inputType: TypeInformation[Row]) +extends ProcessFunction[Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + Preconditions.checkArgument(bufferSize > 0) + + private var accumulators: Row = _ + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowMapState: MapState[Long, JList[Row]] = _ + + override def open(config: Configuration) { + +output = new Row(forwardedFieldCount + aggregates.length) + // We keep the elements received in a list state +// together with the ingestion time in the operator +val rowListTypeInfo: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]] + +val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo) + +rowMapState = getRuntimeContext.getMapState(mapStateDescriptor) + +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo) + +accumulatorState = getRuntimeContext.getState(stateDescriptor) + } + + override def processElement( +input: Row, +ctx: ProcessFunction[Row, Row]#Context, +out: Collector[Row]): Unit = { + +val currentTime = ctx.timerService().currentProcessingTime() +var i = 0 + +var accumulators = accumulatorState.value() +// initialize state for the first processed element +if(accumulators == null){ + accumulators = new Row(aggregates.length) + while (i < aggregates.length) { +accumulators.setField(i, aggregates(i).createAccumulator()) +i += 1 + } +} + +val keyIter = rowMapState.keys.iterator +var oldestTimeStamp = currentTime +var toRetract: JList[Row] = null +var currentKeyTime: Long =
[GitHub] flink pull request #3574: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user stefanobortoli commented on a diff in the pull request: https://github.com/apache/flink/pull/3574#discussion_r108610034 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRowProcessFunction.scala --- @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.types.Row +import org.apache.flink.util.{ Collector, Preconditions } +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.table.functions.{ Accumulator, AggregateFunction } +import scala.collection.mutable.Queue +import org.apache.flink.api.common.state.ListStateDescriptor +import org.apache.flink.api.common.state.ListState +import org.apache.flink.api.common.typeinfo.TypeHint +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.{ ArrayList, LinkedList, List => JList } +import org.apache.flink.api.common.typeinfo.BasicTypeInfo + +class BoundedProcessingOverRowProcessFunction( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Int], + private val bufferSize: Int, + private val forwardedFieldCount: Int, + private val aggregatesTypeInfo: RowTypeInfo, + private val inputType: TypeInformation[Row]) +extends ProcessFunction[Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + Preconditions.checkArgument(bufferSize > 0) + + private var accumulators: Row = _ + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowMapState: MapState[Long, JList[Row]] = _ + + override def open(config: Configuration) { + +output = new Row(forwardedFieldCount + aggregates.length) + // We keep the elements received in a list state +// together with the ingestion time in the operator +val rowListTypeInfo: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]] + +val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo) + +rowMapState = getRuntimeContext.getMapState(mapStateDescriptor) + +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo) + +accumulatorState = getRuntimeContext.getState(stateDescriptor) + } + + override def processElement( +input: Row, +ctx: ProcessFunction[Row, Row]#Context, +out: Collector[Row]): Unit = { + +val currentTime = ctx.timerService().currentProcessingTime() +var i = 0 + +var accumulators = accumulatorState.value() +// initialize state for the first processed element +if(accumulators == null){ + accumulators = new Row(aggregates.length) + while (i < aggregates.length) { +accumulators.setField(i, aggregates(i).createAccumulator()) +i += 1 + } +} + +val keyIter = rowMapState.keys.iterator +var oldestTimeStamp = currentTime +var toRetract: JList[Row] = null --- End diff -- details, but o
[GitHub] flink pull request #3574: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user stefanobortoli commented on a diff in the pull request: https://github.com/apache/flink/pull/3574#discussion_r108609894 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRowProcessFunction.scala --- @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.types.Row +import org.apache.flink.util.{ Collector, Preconditions } +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.table.functions.{ Accumulator, AggregateFunction } +import scala.collection.mutable.Queue +import org.apache.flink.api.common.state.ListStateDescriptor +import org.apache.flink.api.common.state.ListState +import org.apache.flink.api.common.typeinfo.TypeHint +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.{ ArrayList, LinkedList, List => JList } +import org.apache.flink.api.common.typeinfo.BasicTypeInfo + +class BoundedProcessingOverRowProcessFunction( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Int], + private val bufferSize: Int, + private val forwardedFieldCount: Int, + private val aggregatesTypeInfo: RowTypeInfo, + private val inputType: TypeInformation[Row]) +extends ProcessFunction[Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + Preconditions.checkArgument(bufferSize > 0) + + private var accumulators: Row = _ + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowMapState: MapState[Long, JList[Row]] = _ + + override def open(config: Configuration) { + +output = new Row(forwardedFieldCount + aggregates.length) + // We keep the elements received in a list state +// together with the ingestion time in the operator +val rowListTypeInfo: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]] + +val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo) + +rowMapState = getRuntimeContext.getMapState(mapStateDescriptor) + +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo) + +accumulatorState = getRuntimeContext.getState(stateDescriptor) + } + + override def processElement( +input: Row, +ctx: ProcessFunction[Row, Row]#Context, +out: Collector[Row]): Unit = { + +val currentTime = ctx.timerService().currentProcessingTime() +var i = 0 + +var accumulators = accumulatorState.value() +// initialize state for the first processed element +if(accumulators == null){ + accumulators = new Row(aggregates.length) + while (i < aggregates.length) { +accumulators.setField(i, aggregates(i).createAccumulator()) +i += 1 + } +} + +val keyIter = rowMapState.keys.iterator +var oldestTimeStamp = currentTime +var toRetract: JList[Row] = null +var currentKeyTime: Long = 0L
[GitHub] flink issue #3574: [FLINK-5653] Add processing time OVER ROWS BETWEEN x PREC...
Github user stefanobortoli commented on the issue: https://github.com/apache/flink/pull/3574 @fhueske @sunjincheng121 @rtudoran I have just completed the implementation with the MapState, please have a look. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3574: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user stefanobortoli commented on a diff in the pull request: https://github.com/apache/flink/pull/3574#discussion_r108398986 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala --- @@ -239,4 +239,61 @@ class WindowAggregateTest extends TableTestBase { ) streamUtil.verifySql(sql, expected) } + + @Test + def testBoundedNonPartitionedProcessingWindowWithRow() = { +val sql = "SELECT " + + "c, " + + "count(a) OVER (ORDER BY procTime() ROWS BETWEEN 2 preceding AND " + + "CURRENT ROW) as cnt1 " + + "from MyTable" + +val expected = + unaryNode( +"DataStreamCalc", +unaryNode( + "DataStreamOverAggregate", + unaryNode( +"DataStreamCalc", +streamTableNode(0), +term("select", "a", "c", "PROCTIME() AS $2") + ), + term("orderBy", "PROCTIME"), + term("rows", "BETWEEN $3 PRECEDING AND CURRENT ROW"), + term("select", "a", "c", "PROCTIME", "COUNT(a) AS w0$o0") +), +term("select", "c", "w0$o0 AS $1") + ) +streamUtil.verifySql(sql, expected) + } + + @Test + def testBoundedPartitionedProcessingWindowWithRow() = { +val sql = "SELECT " + + "c, " + + "count(a) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 preceding AND " + + "CURRENT ROW) as cnt1 " + + "from MyTable" + +val expected = + unaryNode( +"DataStreamCalc", +unaryNode( + "DataStreamOverAggregate", + unaryNode( +"DataStreamCalc", +streamTableNode(0), +term("select", "a", "c", "PROCTIME() AS $2") + ), + term("partitionBy", "c"), + term("orderBy", "PROCTIME"), + term("rows", "BETWEEN $3 PRECEDING AND CURRENT ROW"), --- End diff -- ok, now it works as you suggested --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3574: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user stefanobortoli commented on a diff in the pull request: https://github.com/apache/flink/pull/3574#discussion_r108372957 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala --- @@ -239,4 +239,61 @@ class WindowAggregateTest extends TableTestBase { ) streamUtil.verifySql(sql, expected) } + + @Test + def testBoundedNonPartitionedProcessingWindowWithRow() = { +val sql = "SELECT " + + "c, " + + "count(a) OVER (ORDER BY procTime() ROWS BETWEEN 2 preceding AND " + + "CURRENT ROW) as cnt1 " + + "from MyTable" + +val expected = + unaryNode( +"DataStreamCalc", +unaryNode( + "DataStreamOverAggregate", + unaryNode( +"DataStreamCalc", +streamTableNode(0), +term("select", "a", "c", "PROCTIME() AS $2") + ), + term("orderBy", "PROCTIME"), + term("rows", "BETWEEN $3 PRECEDING AND CURRENT ROW"), + term("select", "a", "c", "PROCTIME", "COUNT(a) AS w0$o0") +), +term("select", "c", "w0$o0 AS $1") + ) +streamUtil.verifySql(sql, expected) + } + + @Test + def testBoundedPartitionedProcessingWindowWithRow() = { +val sql = "SELECT " + + "c, " + + "count(a) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 preceding AND " + + "CURRENT ROW) as cnt1 " + + "from MyTable" + +val expected = + unaryNode( +"DataStreamCalc", +unaryNode( + "DataStreamOverAggregate", + unaryNode( +"DataStreamCalc", +streamTableNode(0), +term("select", "a", "c", "PROCTIME() AS $2") + ), + term("partitionBy", "c"), + term("orderBy", "PROCTIME"), + term("rows", "BETWEEN $3 PRECEDING AND CURRENT ROW"), --- End diff -- if I add the value the test wont pass. That is how the query is parsed in Calcite. Constant have to be resolved to get the lower boundary. Not sure I got your point. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3574: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user stefanobortoli commented on a diff in the pull request: https://github.com/apache/flink/pull/3574#discussion_r108369391 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateOverWindowFunction.scala --- @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.types.Row +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.util.Collector +import org.apache.flink.table.functions.AggregateFunction +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.util.Preconditions +import org.apache.flink.table.functions.Accumulator +import java.lang.Iterable + +class IncrementalAggregateOverWindowFunction[W <: Window]( + private val numGroupingKey: Int, + private val numAggregates: Int, + private val forwardedFieldCount: Int) +extends RichWindowFunction[Row, Row, Tuple, W] { + + private var output: Row = _ + private var reuse: Row = _ + + override def open(parameters: Configuration): Unit = { +output = new Row(forwardedFieldCount + numAggregates) + } + override def apply( +key: Tuple, +window: W, +records: Iterable[Row], +out: Collector[Row]): Unit = { + +var i = 0 +val iter = records.iterator +while (iter.hasNext) { + reuse = iter.next --- End diff -- extend and implement are different in java, really, just scala confusion to me. :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1827] and small fixes in some tests
Github user stefanobortoli commented on the pull request: https://github.com/apache/flink/pull/1915#issuecomment-216521677 @tillrohrmann, I see the conflicts. How should I deal with this? rebase? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1827] and small fixes in some tests
Github user stefanobortoli commented on a diff in the pull request: https://github.com/apache/flink/pull/1915#discussion_r61855824 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/util/StartupUtils.java --- @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import java.util.List; + +public class StartupUtils { + + /** +* A utility method to analyze the exceptions and collect the clauses +* +* @param e the root exception (Throwable) object +* @param causes the list of exceptions that caused the root exceptions +* @return a list of Throwable +*/ + public List getExceptionCauses(Throwable e, List causes) { --- End diff -- on it --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1827] and small fixes in some tests
Github user stefanobortoli commented on a diff in the pull request: https://github.com/apache/flink/pull/1915#discussion_r61037816 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java --- @@ -53,41 +59,55 @@ public void testStartupWhenTaskmanagerActorPortIsUsed() { blocker = new ServerSocket(0, 50, localAddress); final int port = blocker.getLocalPort(); - try { - TaskManager.runTaskManager( - localHostName, - ResourceID.generate(), - port, - new Configuration(), + TaskManager.runTaskManager(localHostName, ResourceID.generate(), port, new Configuration(), TaskManager.class); - fail("This should fail with an IOException"); - } - catch (IOException e) { - // expected. validate the error message - assertNotNull(e.getMessage()); - assertTrue(e.getMessage().contains("Address already in use")); + fail("This should fail with an IOException"); + + } catch (IOException e) { + // expected. validate the error messagex + List causes = getExceptionCauses(e, new ArrayList()); + for (Throwable cause : causes) { + if (cause instanceof BindException) { + throw (BindException) cause; + } } - - } - catch (Exception e) { + fail("This should fail with an exception caused by BindException"); + } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); - } - finally { + } finally { if (blocker != null) { try { blocker.close(); - } - catch (IOException e) { + } catch (IOException e) { // no need to log here } } } } /** -* Tests that the TaskManager startup fails synchronously when the I/O directories are -* not writable. +* A utility method to analyze the exceptions and collect the clauses +* +* @param e +*the root exception (Throwable) object +* @param causes +*the list of exceptions that caused the root exceptions +* @return +*/ + private List getExceptionCauses(Throwable e, List causes) { --- End diff -- sure I can do that --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Flink 1827 and small fixes in some tests
Github user stefanobortoli commented on the pull request: https://github.com/apache/flink/pull/1915#issuecomment-212305774 We had a look at the Flink test dependency hierarchies and the thing is feasible but I it should be done in a separate PR. In fact, it requires a clerical review of all projects and a better management of the test's transitive dependencies that right now is very redundant. For example, if I need _flink-optimizer_ I do not need to specify also the dependencies on _flink-core_ and _flink-runtime_ because they are included transitively. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Flink 1827 and small fixes in some tests
Github user stefanobortoli commented on the pull request: https://github.com/apache/flink/pull/1915#issuecomment-212293101 We mainly wanted to simplify the build skipping tests. However, if you think it makes sense, we can give it a try. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Flink 1827 and small fixes in some tests
GitHub user stefanobortoli opened a pull request: https://github.com/apache/flink/pull/1915 Flink 1827 and small fixes in some tests Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [X] General - The pull request references the related JIRA issue - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message - [X] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [X] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/stefanobortoli/flink FLINK-1827 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1915.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1915 commit 88483205c57884303c9e46159df9d89aa953b547 Author: okkam <okkam@okkamvm> Date: 2016-04-19T09:50:59Z FLINK-1827 fixed compilation that skips tests commit 5a5cc98ba4fc407fe0fc18d7a57dc4455ce80001 Author: okkam <okkam@okkamvm> Date: 2016-04-19T09:50:59Z FLINK-1827 fixed compilation that skips tests commit 85b1763ea84a59258a3a4fef87d05650e99b5be0 Author: okkam <okkam@okkamvm> Date: 2016-04-19T09:58:13Z Merge remote-tracking branch 'origin/FLINK-1827' into FLINK-1827 Conflicts: flink-test-utils/pom.xml commit 310b4e901283fde317dc97c9d454341d331f5d04 Author: okkam <okkam@okkamvm> Date: 2016-04-19T16:37:55Z FLINK-1827 and improved tests removing references to localized messages --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---