[GitHub] flink issue #3783: [FLINK-6388] Add support for DISTINCT into Code Generated...
Github user huawei-flink commented on the issue: https://github.com/apache/flink/pull/3783 @fhueske , this is the PR with the code generated distinct aggregation for OVER. You mentioned that the value of the aggregation should be a Row, but what is kept in the distinct state is just the event value, not its "aggregation value state". Perhaps you can try to explain it better to me so that I can complete this PR and we can move on. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3783: [FLINK-6388] Add support for DISTINCT into Code Generated...
Github user huawei-flink commented on the issue: https://github.com/apache/flink/pull/3783 @rtudoran @fhueske the first implementation I made was with the state in the ProcessFunction without code generated aggregation function. Second, I pushed a branch with the state in the process function using the code generated process function. Then, third I moved the state within the code generated function. It is not clear to me why the state cannot be within the code generated function. Could you please clarify so that we can understand whether it is worth working around it. This feature is quite important for us. Anyway, you could have a look at the branch that uses the state in the process function and uses the code generated aggregation functions. Basically, rather than generate one code generated function for all the aggregations, I create one class for each, and then I call the corresponding accumulate/retract using the distinct logic when marked in the process function. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3771: [FLINK-6250] Distinct procTime with Rows boundaries
Github user huawei-flink commented on the issue: https://github.com/apache/flink/pull/3771 @fhueske I have created #3783 with just the code generation part. At least the GROUP BY distinct can move ahead. I will close this PR and wait for the merging of the Calcite fix. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3783: [FLINK-6338] Add support for DISTINCT into Code Generated...
Github user huawei-flink commented on the issue: https://github.com/apache/flink/pull/3783 @fhueske please have a look at this PR, it contains just the code generation part with optional distinct. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3783: [FLINK-6338] Add support for DISTINCT into Code Ge...
GitHub user huawei-flink opened a pull request: https://github.com/apache/flink/pull/3783 [FLINK-6338] Add support for DISTINCT into Code Generated Aggregations Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/stefanobortoli/flink FLINK-6338 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3783.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3783 commit 55e0a8d38187bef22b6135db7f4a5c1cc8f15811 Author: Stefano Bortoli <s.bort...@gmail.com> Date: 2017-04-26T17:22:04Z Added code generation distinct aggregation logic --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3771: [FLINK-6250] Distinct procTime with Rows boundaries
Github user huawei-flink commented on the issue: https://github.com/apache/flink/pull/3771 So, what do you want me to keep for this PR? just the code generation and its test? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3771: [FLINK-6250] Distinct procTime with Rows boundarie...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3771#discussion_r113472166 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala --- @@ -72,7 +73,15 @@ class ProcTimeBoundedRowsOver( genAggregations.code) LOG.debug("Instantiating AggregateHelper.") function = clazz.newInstance() - + +var initialized = false +for(i <- distinctAggFlags.indices){ + if(distinctAggFlags(i) && !initialized){ +function.initialize(getRuntimeContext()) --- End diff -- right! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3771: [FLINK-6250] Distinct procTime with Rows boundarie...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3771#discussion_r113472029 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala --- @@ -20,16 +20,22 @@ package org.apache.flink.table.runtime.aggregate import org.apache.flink.api.common.functions.Function import org.apache.flink.types.Row +import org.apache.flink.api.common.functions.RuntimeContext /** * Base class for code-generated aggregations. */ abstract class GeneratedAggregations extends Function { + + /** +* Initialize the state for the distinct aggregation check +* +* @param ctx the runtime context to retrieve and initialize the distinct states +*/ + def initialize(ctx: RuntimeContext) /** -* Sets the results of the aggregations (partial or final) to the output row. --- End diff -- probably an error in the merging. sorry about that --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3771: [FLINK-6250] Distinct procTime with Rows boundarie...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3771#discussion_r113471605 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -335,14 +371,28 @@ class CodeGenerator( j""" | public final void accumulate( |org.apache.flink.types.Row accs, -|org.apache.flink.types.Row input)""".stripMargin +|org.apache.flink.types.Row input) throws Exception""".stripMargin val accumulate: String = { for (i <- aggs.indices) yield - j""" + if(distinctAggsFlags(i)){ + j""" + | Long distValCount$i = (Long) distStateList[$i].get(${parameters(i)}); + | if( distValCount$i == null){ --- End diff -- In scala it is 0L, is Java it is null. :-/ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3771: [FLINK-6250] Distinct procTime with Rows boundarie...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3771#discussion_r113471045 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -296,6 +297,41 @@ class CodeGenerator( fields.mkString(", ") } +def genInitialize(existDistinct : Boolean): String = { + + val sig: String = +j""" + | org.apache.flink.api.common.state.MapState[] distStateList = + | new org.apache.flink.api.common.state.MapState[ ${distinctAggsFlags.size} ]; + | + | public void initialize( + |org.apache.flink.api.common.functions.RuntimeContext ctx + | )""".stripMargin + if(existDistinct){ --- End diff -- you are right --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3771: [FLINK-6250] Distinct procTime with Rows boundaries
Github user huawei-flink commented on the issue: https://github.com/apache/flink/pull/3771 @fhueske @haohui I have no problem removing the DIST() part, it is just not possible to test it without. Shall I push just the code generation and aggregates util changes? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...
Github user huawei-flink closed the pull request at: https://github.com/apache/flink/pull/3732 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3732#discussion_r112210389 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala --- @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.types.Row +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.{List => JList} + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo + +class ProcTimeBoundedDistinctRowsOver( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Array[Int]], + private val distinctAggsFlag: Array[Boolean], + private val precedingOffset: Long, + private val forwardedFieldCount: Int, + private val aggregatesTypeInfo: RowTypeInfo, + private val inputType: TypeInformation[Row]) +extends ProcessFunction[Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkNotNull(distinctAggsFlag) + Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length) + Preconditions.checkArgument(aggregates.length == aggFields.length) + Preconditions.checkArgument(precedingOffset > 0) + + private var accumulatorState: ValueState[Row] = _ + private var rowMapState: MapState[Long, JList[Row]] = _ + private var output: Row = _ + private var counterState: ValueState[Long] = _ + private var smallestTsState: ValueState[Long] = _ + private var distinctValueState: MapState[Any, Row] = _ + + override def open(config: Configuration) { + +output = new Row(forwardedFieldCount + aggregates.length) +// We keep the elements received in a Map state keyed +// by the ingestion time in the operator. +// we also keep counter of processed elements +// and timestamp of oldest element +val rowListTypeInfo: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]] + +val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo) +rowMapState = getRuntimeContext.getMapState(mapStateDescriptor) + +val aggregationStateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo) +accumulatorState = getRuntimeContext.getState(aggregationStateDescriptor) + +val processedCountDescriptor : ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("processedCountState", classOf[Long]) +counterState = getRuntimeContext.getState(processedCountDescriptor) + +val smallestTimestampDescriptor : ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("smallestTSState", classOf[Long]) +smallestTsState = getRuntimeContext.getState(smallestTimestampDescriptor) + +val distinctValDescriptor : MapStateDescriptor[Any, Row] = + new MapStateDescriptor[Any, Row]("distinctValuesBu
[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3732#discussion_r112210097 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala --- @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.types.Row +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.{List => JList} + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo + +class ProcTimeBoundedDistinctRowsOver( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Array[Int]], + private val distinctAggsFlag: Array[Boolean], + private val precedingOffset: Long, + private val forwardedFieldCount: Int, + private val aggregatesTypeInfo: RowTypeInfo, + private val inputType: TypeInformation[Row]) +extends ProcessFunction[Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkNotNull(distinctAggsFlag) + Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length) + Preconditions.checkArgument(aggregates.length == aggFields.length) + Preconditions.checkArgument(precedingOffset > 0) + + private var accumulatorState: ValueState[Row] = _ + private var rowMapState: MapState[Long, JList[Row]] = _ + private var output: Row = _ + private var counterState: ValueState[Long] = _ + private var smallestTsState: ValueState[Long] = _ + private var distinctValueState: MapState[Any, Row] = _ + + override def open(config: Configuration) { + +output = new Row(forwardedFieldCount + aggregates.length) +// We keep the elements received in a Map state keyed +// by the ingestion time in the operator. +// we also keep counter of processed elements +// and timestamp of oldest element +val rowListTypeInfo: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]] + +val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo) +rowMapState = getRuntimeContext.getMapState(mapStateDescriptor) + +val aggregationStateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo) +accumulatorState = getRuntimeContext.getState(aggregationStateDescriptor) + +val processedCountDescriptor : ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("processedCountState", classOf[Long]) +counterState = getRuntimeContext.getState(processedCountDescriptor) + +val smallestTimestampDescriptor : ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("smallestTSState", classOf[Long]) +smallestTsState = getRuntimeContext.getState(smallestTimestampDescriptor) + +val distinctValDescriptor : MapStateDescriptor[Any, Row] = + new MapStateDescriptor[Any, Row]("disti
[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3732#discussion_r112180091 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala --- @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.types.Row +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.{List => JList} + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo + +class ProcTimeBoundedDistinctRowsOver( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Array[Int]], + private val distinctAggsFlag: Array[Boolean], + private val precedingOffset: Long, + private val forwardedFieldCount: Int, + private val aggregatesTypeInfo: RowTypeInfo, + private val inputType: TypeInformation[Row]) +extends ProcessFunction[Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkNotNull(distinctAggsFlag) + Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length) + Preconditions.checkArgument(aggregates.length == aggFields.length) + Preconditions.checkArgument(precedingOffset > 0) + + private var accumulatorState: ValueState[Row] = _ + private var rowMapState: MapState[Long, JList[Row]] = _ + private var output: Row = _ + private var counterState: ValueState[Long] = _ + private var smallestTsState: ValueState[Long] = _ + private var distinctValueState: MapState[Any, Row] = _ + + override def open(config: Configuration) { + +output = new Row(forwardedFieldCount + aggregates.length) +// We keep the elements received in a Map state keyed +// by the ingestion time in the operator. +// we also keep counter of processed elements +// and timestamp of oldest element +val rowListTypeInfo: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]] + +val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo) +rowMapState = getRuntimeContext.getMapState(mapStateDescriptor) + +val aggregationStateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo) +accumulatorState = getRuntimeContext.getState(aggregationStateDescriptor) + +val processedCountDescriptor : ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("processedCountState", classOf[Long]) +counterState = getRuntimeContext.getState(processedCountDescriptor) + +val smallestTimestampDescriptor : ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("smallestTSState", classOf[Long]) +smallestTsState = getRuntimeContext.getState(smallestTimestampDescriptor) + +val distinctValDescriptor : MapStateDescriptor[Any, Row] = + new MapStateDescriptor[Any, Row]("disti
[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3732#discussion_r112151646 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala --- @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.types.Row +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.{List => JList} + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo + +class ProcTimeBoundedDistinctRowsOver( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Array[Int]], + private val distinctAggsFlag: Array[Boolean], + private val precedingOffset: Long, + private val forwardedFieldCount: Int, + private val aggregatesTypeInfo: RowTypeInfo, + private val inputType: TypeInformation[Row]) +extends ProcessFunction[Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkNotNull(distinctAggsFlag) + Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length) + Preconditions.checkArgument(aggregates.length == aggFields.length) + Preconditions.checkArgument(precedingOffset > 0) + + private var accumulatorState: ValueState[Row] = _ + private var rowMapState: MapState[Long, JList[Row]] = _ + private var output: Row = _ + private var counterState: ValueState[Long] = _ + private var smallestTsState: ValueState[Long] = _ + private var distinctValueState: MapState[Any, Row] = _ + + override def open(config: Configuration) { + +output = new Row(forwardedFieldCount + aggregates.length) +// We keep the elements received in a Map state keyed +// by the ingestion time in the operator. +// we also keep counter of processed elements +// and timestamp of oldest element +val rowListTypeInfo: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]] + +val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo) +rowMapState = getRuntimeContext.getMapState(mapStateDescriptor) + +val aggregationStateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo) +accumulatorState = getRuntimeContext.getState(aggregationStateDescriptor) + +val processedCountDescriptor : ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("processedCountState", classOf[Long]) +counterState = getRuntimeContext.getState(processedCountDescriptor) + +val smallestTimestampDescriptor : ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("smallestTSState", classOf[Long]) +smallestTsState = getRuntimeContext.getState(smallestTimestampDescriptor) + +val distinctValDescriptor : MapStateDescriptor[Any, Row] = + new MapStateDescriptor[Any, Row]("distinctValuesBu
[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3732#discussion_r112151222 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala --- @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import java.util + +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.types.Row +import org.apache.flink.util.{Collector, Preconditions} +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.table.functions.{Accumulator, AggregateFunction} +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.{List => JList} + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo + +class ProcTimeBoundedDistinctRowsOver( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Array[Int]], + private val distinctAggsFlag: Array[Boolean], + private val precedingOffset: Long, + private val forwardedFieldCount: Int, + private val aggregatesTypeInfo: RowTypeInfo, + private val inputType: TypeInformation[Row]) +extends ProcessFunction[Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkNotNull(distinctAggsFlag) + Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length) + Preconditions.checkArgument(aggregates.length == aggFields.length) + Preconditions.checkArgument(precedingOffset > 0) + + private var accumulatorState: ValueState[Row] = _ + private var rowMapState: MapState[Long, JList[Row]] = _ + private var output: Row = _ + private var counterState: ValueState[Long] = _ + private var smallestTsState: ValueState[Long] = _ + private var distinctValueState: MapState[Any, Row] = _ + + override def open(config: Configuration) { + +output = new Row(forwardedFieldCount + aggregates.length) +// We keep the elements received in a Map state keyed +// by the ingestion time in the operator. +// we also keep counter of processed elements +// and timestamp of oldest element +val rowListTypeInfo: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]] + +val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo) +rowMapState = getRuntimeContext.getMapState(mapStateDescriptor) + +val aggregationStateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo) +accumulatorState = getRuntimeContext.getState(aggregationStateDescriptor) + +val processedCountDescriptor : ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("processedCountState", classOf[Long]) +counterState = getRuntimeContext.getState(processedCountDescriptor) + +val smallestTimestampDescriptor : ValueStateDescriptor[Long] = + new ValueStateDescriptor[Long]("smallestTSState", classOf[Long]) +smallestTsState = getRuntimeContext.getState(smallestTimestampDescriptor) + +val distinctValDescriptor : MapStateDescriptor[Any, Row] = + new MapStateDescriptor[Any, Row]("distinctValuesBu
[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...
GitHub user huawei-flink opened a pull request: https://github.com/apache/flink/pull/3732 [FLINK-6250] Distinct procTime with Rows boundaries Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [X] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [X] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [X] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/huawei-flink/flink FLINK-6250 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3732.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3732 commit 4e3da4c9baebf48bfc47ef192287e7e17ab69efd Author: Stefano Bortoli <s.bort...@gmail.com> Date: 2017-04-18T12:27:26Z DIST() for aggregation on procTime row bounded windows --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3459: [FLINK-5654] Add processing time OVER RANGE BETWEE...
Github user huawei-flink closed the pull request at: https://github.com/apache/flink/pull/3459 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3574: [FLINK-5653] Add processing time OVER ROWS BETWEEN x PREC...
Github user huawei-flink commented on the issue: https://github.com/apache/flink/pull/3574 @fhueske Thanks a lot of the clarification. I understand the issue better now, and see your attempt to make an average case that would work for both in memory as well as on external persistence. Considering RocksDB as the state of art, your choice sounds much more reasonable. We are well aware of the costs of serialization, and the impact is definitely important. However, low latency systems with strict SLA will likely run just in memory. The O(n) of the MapState is granted by the fact that time is monothonic and therefore the sequential reading is managed by the key timestamp. The cost of each O(1) in the hashmap increseas with the size of the window thou as you need to search through the map index. We definitely need better data access patterns for the state of "time series" types of data. I will try to internalize it and provide the MapState implementation --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3574: [FLINK-5653] Add processing time OVER ROWS BETWEEN x PREC...
Github user huawei-flink commented on the issue: https://github.com/apache/flink/pull/3574 Hi @fhueske, @sunjincheng121 , let me try to explain my perspective on this specific case (row based, proc time). This is for the purpose of discussion, to show that we are spending thoughts on this topic for a while now. In case of the row range, the "serialization savings" coming from MapState exists up to the point in which the "buffer" is filled. After that that, we need to start retracting to keep the value correct and to do that, we need to deserialize all the objects. as @rtudoran mentioned, we implemented a version using a Queue object. This has many advantages: - removing the object from the buffer at the right moment freeing memory on the go (without any iteration over the key set) - has the data access pattern of O(1) without any "key resolution costs" and no list iteration - keeps the natural processing order by design, without the need of indexing objects with timestamps - the experiments we run show that there are no difference for windows up to 100k elements, and after that the queue seems to be more efficient (as the the key resolution does not come for free). The map state may have a slight advantage in the early stages, when the window is not filled, but after it just introduces useless operations. Furthermore, the need to index objects with a created timestamp (more memory wasted), dealing with a sequential access (List) to get the most recent object when you can actually just use the natural arrival order seems useless complication. Applying the Occam Razor there should be no doubt on which solution we should be selecting first. The serialization optimization while window gets filled sounds like a premature optimization not worth in the long run. The further implementation of SQL operators (e.g. LIMIT, OFFSET etc) can just benefit from the fact that the state is already sorted, whereas the map would need to be sorted all the time. Of course I am talking specifically of the procTime semantic operations. eventTime is another story anyway. The map state as minor advantages in the beginning (as anyway the serialization costs are small), the queue state as advantages in executions running steadily because of access pattern and natural buffer cleansing. These are my two cents on the discussion --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3574: [FLINK-5653] Add processing time OVER ROWS BETWEEN x PREC...
Github user huawei-flink commented on the issue: https://github.com/apache/flink/pull/3574 I have a first implementation of the processFunction, using a Queue as a state. However, I need to implement the retractableAggregation, as AVG for example is not supported. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3574: [FLINK-5653] Add processing time OVER ROWS BETWEEN x PREC...
Github user huawei-flink commented on the issue: https://github.com/apache/flink/pull/3574 @fhueske let's see if the third attempt works. I have included all the comments of @sunjincheng121 (apart from the window function) and having done a merge on a fresh checkout also the imports should be fine. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user huawei-flink closed the pull request at: https://github.com/apache/flink/pull/3547 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3574: Eager aggregation over row bounded window
GitHub user huawei-flink opened a pull request: https://github.com/apache/flink/pull/3574 Eager aggregation over row bounded window Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [X ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [X ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [X ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/huawei-flink/flink FLINK-5653c Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3574.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3574 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN x PREC...
Github user huawei-flink commented on the issue: https://github.com/apache/flink/pull/3547 sorry about the mess. I don't understand the mess the rebase does with eclipse... I will close this PR and open another one including all the changes and comments --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3547#discussion_r106917566 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -17,35 +17,62 @@ */ package org.apache.flink.table.runtime.aggregate -import java.util +import scala.collection.JavaConversions.asScalaBuffer --- End diff -- I need that import, without it does not build. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3547#discussion_r106868683 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/ProcTimeRowBoundedAggregationTest.scala --- @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.api.scala.stream.sql + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.table.api.{ TableEnvironment, TableException } +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.scala.stream.utils.{ + StreamingWithStateTestBase, + StreamITCase, + StreamTestData +} +import org.junit.Assert._ +import org.junit._ +import scala.collection.mutable +import org.apache.flink.types.Row + +class ProcTimeRowBoundedAggregationTest extends StreamingWithStateTestBase { --- End diff -- @fhueske Should I add all the test unit under the SqlITCase class? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3547#discussion_r106616178 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -106,9 +113,14 @@ class DataStreamOverAggregate( if (overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { createUnboundedAndCurrentRowProcessingTimeOverWindow(inputDS) +} // lowerbound is a BasicType and upperbound is PRECEEDING or CURRENT ROW +else if (overWindow.lowerBound.getOffset.getType.isInstanceOf[BasicSqlType] --- End diff -- I guess that is also a way to do it. The check allows to distinguish between time bounded and row bounded. I have no particular affection for my solution, it just worked. I will apply and test yours as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3547#discussion_r106615708 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -17,34 +17,41 @@ */ package org.apache.flink.table.plan.nodes.datastream -import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet } --- End diff -- @fhueske sorry about that. Will be more careful in the next one. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3547#discussion_r106465657 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverWindowFunction.scala --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.types.Row +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.util.Collector +import org.apache.flink.table.functions.AggregateFunction +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.util.Preconditions +import org.apache.flink.table.functions.Accumulator +import java.lang.Iterable + +class BoundedProcessingOverWindowFunction[W <: Window]( --- End diff -- I just thought about a case where one wants to do a COUNT DISTINCT type of aggregation. How does a processFunction work for that? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3550#discussion_r106453475 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataStreamProcTimeAggregateGlobalWindowFunction.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction +import org.apache.flink.types.Row +import org.apache.flink.util.Collector +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.configuration.Configuration +import org.apache.flink.table.functions.Accumulator + +import java.lang.Iterable +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.table.functions.AggregateFunction + + + + //org.apache.flink.streaming.api.functions.windowing.AllWindowFunction + +class DataStreamProcTimeAggregateGlobalWindowFunction[W <: Window]( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Int], + private val forwardedFieldCount: Int) + extends RichAllWindowFunction[Row, Row, W] { + +private var output: Row = _ +private var accumulators: Row= _ + + + override def open(parameters: Configuration): Unit = { + output = new Row(forwardedFieldCount + aggregates.length) + accumulators = new Row(aggregates.length) + var i = 0 + while (i < aggregates.length) { +accumulators.setField(i, aggregates(i).createAccumulator()) +i = i + 1 + } + } + + override def apply( + window: W, + records: Iterable[Row], + out: Collector[Row]): Unit = { + + + var i = 0 + //initialize the values of the aggregators by re-creating them + //the design of the Accumulator interface should be extended to enable + //a reset function for better performance + while (i < aggregates.length) { +accumulators.setField(i, aggregates(i).createAccumulator()) --- End diff -- I will apply this fix also in my function --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3547#discussion_r106452550 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/ProcTimeRowStreamAggregationSqlITCase.java --- @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.java.stream.sql; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.table.api.java.stream.utils.StreamTestData; +import org.apache.flink.table.api.scala.stream.utils.StreamITCase; +import org.apache.flink.types.Row; +import org.junit.Ignore; +import org.junit.Test; + +public class ProcTimeRowStreamAggregationSqlITCase extends StreamingMultipleProgramsTestBase { + + + @Test + public void testMaxAggregatation() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); + StreamITCase.clear(); + + env.setParallelism(1); + + DataStream<Tuple5<Integer, Long, Integer, String, Long>> ds = StreamTestData.get5TupleDataStream(env); + Table in = tableEnv.fromDataStream(ds, "a,b,c,d,e"); + tableEnv.registerTable("MyTable", in); + + String sqlQuery = "SELECT a, MAX(c) OVER (PARTITION BY a ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS maxC FROM MyTable"; + Table result = tableEnv.sql(sqlQuery); + + DataStream resultSet = tableEnv.toDataStream(result, Row.class); + resultSet.addSink(new StreamITCase.StringSink()); + env.execute(); + + List expected = new ArrayList<>(); + expected.add("1,0"); + expected.add("2,1"); + expected.add("2,2"); + expected.add("3,3"); + expected.add("3,4"); + expected.add("3,5"); + expected.add("4,6"); + expected.add("4,7"); + expected.add("4,8"); + expected.add("4,9"); + expected.add("5,10"); + expected.add("5,11"); + expected.add("5,12"); + expected.add("5,14"); + expected.add("5,14"); + + StreamITCase.compareWithList(expected); + } + + @Test + public void testMinAggregatation() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); + StreamITCase.clear(); + + env.setParallelism(1); + + DataStream<Tuple5<Integer, Long, Integer, String, Long>> ds = StreamTestData.get5TupleDataStream(env); + Table in = tableEnv.fromDataStream(ds, "a,b,c,d,e"); + tableEnv.registerTable("MyTable", in); + + String sqlQuery = "SELECT a, MIN(c) OVER (PARTITION BY a ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS maxC FROM MyTable"; + Table result = tableEnv.sql(sqlQuery); + +
[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3547#discussion_r106369028 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/ProcTimeRowStreamAggregationSqlITCase.java --- @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.java.stream.sql; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.table.api.java.stream.utils.StreamTestData; +import org.apache.flink.table.api.scala.stream.utils.StreamITCase; +import org.apache.flink.types.Row; +import org.junit.Ignore; +import org.junit.Test; + +public class ProcTimeRowStreamAggregationSqlITCase extends StreamingMultipleProgramsTestBase { + + + @Test + public void testMaxAggregatation() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); + StreamITCase.clear(); + + env.setParallelism(1); + + DataStream<Tuple5<Integer, Long, Integer, String, Long>> ds = StreamTestData.get5TupleDataStream(env); + Table in = tableEnv.fromDataStream(ds, "a,b,c,d,e"); + tableEnv.registerTable("MyTable", in); + + String sqlQuery = "SELECT a, MAX(c) OVER (PARTITION BY a ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS maxC FROM MyTable"; + Table result = tableEnv.sql(sqlQuery); + + DataStream resultSet = tableEnv.toDataStream(result, Row.class); + resultSet.addSink(new StreamITCase.StringSink()); + env.execute(); + + List expected = new ArrayList<>(); + expected.add("1,0"); + expected.add("2,1"); + expected.add("2,2"); + expected.add("3,3"); + expected.add("3,4"); + expected.add("3,5"); + expected.add("4,6"); + expected.add("4,7"); + expected.add("4,8"); + expected.add("4,9"); + expected.add("5,10"); + expected.add("5,11"); + expected.add("5,12"); + expected.add("5,14"); + expected.add("5,14"); + + StreamITCase.compareWithList(expected); + } + + @Test + public void testMinAggregatation() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); + StreamITCase.clear(); + + env.setParallelism(1); + + DataStream<Tuple5<Integer, Long, Integer, String, Long>> ds = StreamTestData.get5TupleDataStream(env); + Table in = tableEnv.fromDataStream(ds, "a,b,c,d,e"); + tableEnv.registerTable("MyTable", in); + + String sqlQuery = "SELECT a, MIN(c) OVER (PARTITION BY a ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS maxC FROM MyTable"; + Table result = tableEnv.sql(sqlQuery); + +
[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3547#discussion_r106367506 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/ProcTimeRowStreamAggregationSqlITCase.java --- @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.java.stream.sql; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.table.api.java.stream.utils.StreamTestData; +import org.apache.flink.table.api.scala.stream.utils.StreamITCase; +import org.apache.flink.types.Row; +import org.junit.Ignore; +import org.junit.Test; + +public class ProcTimeRowStreamAggregationSqlITCase extends StreamingMultipleProgramsTestBase { --- End diff -- Why should also the test be implemented in scala? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3547#discussion_r106367130 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -130,32 +142,76 @@ class DataStreamOverAggregate( val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] val result: DataStream[Row] = -// partitioned aggregation -if (partitionKeys.nonEmpty) { - val processFunction = AggregateUtil.CreateUnboundedProcessingOverProcessFunction( -namedAggregates, -inputType) + // partitioned aggregation + if (partitionKeys.nonEmpty) { +val processFunction = AggregateUtil.CreateUnboundedProcessingOverProcessFunction( + namedAggregates, + inputType) - inputDS +inputDS .keyBy(partitionKeys: _*) .process(processFunction) .returns(rowTypeInfo) .name(aggOpName) .asInstanceOf[DataStream[Row]] -} -// non-partitioned aggregation -else { - val processFunction = AggregateUtil.CreateUnboundedProcessingOverProcessFunction( -namedAggregates, -inputType, -false) - - inputDS - .process(processFunction).setParallelism(1).setMaxParallelism(1) -.returns(rowTypeInfo) -.name(aggOpName) -.asInstanceOf[DataStream[Row]] -} + } // non-partitioned aggregation + else { +val processFunction = AggregateUtil.CreateUnboundedProcessingOverProcessFunction( + namedAggregates, + inputType, + false) + +inputDS + .process(processFunction).setParallelism(1).setMaxParallelism(1) + .returns(rowTypeInfo) + .name(aggOpName) + .asInstanceOf[DataStream[Row]] + } +result + } + + def createBoundedAndCurrentRowProcessingTimeOverWindow( +inputDS: DataStream[Row]): DataStream[Row] = { + +val overWindow: Group = logicWindow.groups.get(0) +val partitionKeys: Array[Int] = overWindow.keys.toArray +val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates + +// get the output types +val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] + +val lowerbound: Int = AggregateUtil.getLowerBoundary( + logicWindow.constants, + overWindow.lowerBound, + getInput()) + +val result: DataStream[Row] = + // partitioned aggregation + if (partitionKeys.nonEmpty) { +val windowFunction = AggregateUtil.CreateBoundedProcessingOverWindowFunction( + namedAggregates, + inputType) +inputDS + .keyBy(partitionKeys: _*) + .countWindow(lowerbound,1) + .apply(windowFunction) + .returns(rowTypeInfo) + .name(aggOpName) + .asInstanceOf[DataStream[Row]] + } // global non-partitioned aggregation + else { +val windowFunction = AggregateUtil.CreateBoundedProcessingOverGlobalWindowFunction( + namedAggregates, + inputType) + +inputDS + .countWindowAll(lowerbound,1) --- End diff -- So, the semantic of between 2 rows and current row does not include the current row and I should count 3 elements? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3547#discussion_r106366744 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverWindowFunction.scala --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.types.Row +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.util.Collector +import org.apache.flink.table.functions.AggregateFunction +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.util.Preconditions +import org.apache.flink.table.functions.Accumulator +import java.lang.Iterable + +class BoundedProcessingOverWindowFunction[W <: Window]( --- End diff -- Thanks a lot for the clarification. I am really willing to do it right, but at the same time I need to understand. So, please be patient. :-) When we started discussing the issue with @fhueske (https://issues.apache.org/jira/browse/FLINK-5654?filter=-1) there was a decision to use window, not process function. Code consistency is pretty much the same, just extening a different interface. I understand that ProcessFunction can manage its state, but window checkpointing should replay all events in case of failure, so we would have consistent processing even without managing this level of granularity in the state. With procTime semantic, we can neglect retraction, and window can anyway customize triggering function. I don't understand the third point. The main argument I see for this specific case is that ProcessFunction supports granular state management. Besides the alleged code consistency. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3547#discussion_r106361152 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -19,33 +19,124 @@ package org.apache.flink.table.runtime.aggregate import java.util -import org.apache.calcite.rel.`type`._ +import scala.collection.JavaConversions.asScalaBuffer +import scala.collection.mutable.ArrayBuffer + +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.AggregateCall +import org.apache.calcite.rex.RexInputRef +import org.apache.calcite.rex.RexLiteral +import org.apache.calcite.rex.RexWindowBound +import org.apache.calcite.sql.SqlAggFunction +import org.apache.calcite.sql.SqlKind import org.apache.calcite.sql.`type`.SqlTypeName -import org.apache.calcite.sql.`type`.SqlTypeName._ -import org.apache.calcite.sql.fun._ -import org.apache.calcite.sql.{SqlAggFunction, SqlKind} -import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.calcite.sql.`type`.SqlTypeName.BIGINT +import org.apache.calcite.sql.`type`.SqlTypeName.BOOLEAN +import org.apache.calcite.sql.`type`.SqlTypeName.DECIMAL +import org.apache.calcite.sql.`type`.SqlTypeName.DOUBLE +import org.apache.calcite.sql.`type`.SqlTypeName.FLOAT +import org.apache.calcite.sql.`type`.SqlTypeName.INTEGER +import org.apache.calcite.sql.`type`.SqlTypeName.SMALLINT +import org.apache.calcite.sql.`type`.SqlTypeName.TINYINT --- End diff -- I see your point, although I thought that wildcard import was not a best practice. It seems that the java and scala implementation are following different conventions. I have no problems with it in principle. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3547#discussion_r106360688 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala --- @@ -18,8 +18,6 @@ package org.apache.flink.table.plan.logical.rel -import java.util --- End diff -- Thanks, both solutions look better than the half package import. I will apply one of those. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3547#discussion_r106238200 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -19,33 +19,124 @@ package org.apache.flink.table.runtime.aggregate import java.util -import org.apache.calcite.rel.`type`._ +import scala.collection.JavaConversions.asScalaBuffer +import scala.collection.mutable.ArrayBuffer + +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.AggregateCall +import org.apache.calcite.rex.RexInputRef +import org.apache.calcite.rex.RexLiteral +import org.apache.calcite.rex.RexWindowBound +import org.apache.calcite.sql.SqlAggFunction +import org.apache.calcite.sql.SqlKind import org.apache.calcite.sql.`type`.SqlTypeName -import org.apache.calcite.sql.`type`.SqlTypeName._ -import org.apache.calcite.sql.fun._ -import org.apache.calcite.sql.{SqlAggFunction, SqlKind} -import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.calcite.sql.`type`.SqlTypeName.BIGINT +import org.apache.calcite.sql.`type`.SqlTypeName.BOOLEAN +import org.apache.calcite.sql.`type`.SqlTypeName.DECIMAL +import org.apache.calcite.sql.`type`.SqlTypeName.DOUBLE +import org.apache.calcite.sql.`type`.SqlTypeName.FLOAT +import org.apache.calcite.sql.`type`.SqlTypeName.INTEGER +import org.apache.calcite.sql.`type`.SqlTypeName.SMALLINT +import org.apache.calcite.sql.`type`.SqlTypeName.TINYINT --- End diff -- after an inspection, I realized that the imports you mentioned are used. I think there is no unused import at this moment. Am I missing something? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3547#discussion_r106222135 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala --- @@ -18,8 +18,6 @@ package org.apache.flink.table.plan.logical.rel -import java.util --- End diff -- In principle I agree, but I it caused some building problem. Is there a practical reason for not importing the List class directly rather than creating "half references"? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3547#discussion_r106221708 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala --- @@ -37,8 +35,8 @@ class LogicalWindowAggregate( child: RelNode, indicator: Boolean, groupSet: ImmutableBitSet, -groupSets: util.List[ImmutableBitSet], -aggCalls: util.List[AggregateCall]) +groupSets: java.util.List[ImmutableBitSet], --- End diff -- for some reason it does not build in eclipse in the original way, and honestly I struggle to understand "the half package name" usage in Scala. Is there any practical reason for that? and does it break some convention to use the complete package name? I am asking with honest curiosity, and no polemical intention. Thanks for clarifying. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3547#discussion_r106220189 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -159,6 +168,42 @@ class DataStreamOverAggregate( result } +def createBoundedAndCurrentRowProcessingTimeOverWindow( +inputDS: DataStream[Row]): DataStream[Row] = { + +val overWindow: Group = logicWindow.groups.get(0) +val partitionKeys: Array[Int] = overWindow.keys.toArray +val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates + +// get the output types +val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] + +val result: DataStream[Row] = + // partitioned aggregation + if (partitionKeys.nonEmpty) { +val windowFunction = AggregateUtil.CreateBoundedProcessingOverWindowFunction( + namedAggregates, + inputType) + +val lowerbound: Int = AggregateUtil.getLowerBoundary( + logicWindow.constants, + overWindow.lowerBound, + getInput()) + +inputDS + .keyBy(partitionKeys: _*) + .countWindow(lowerbound, 1).apply(windowFunction) + .returns(rowTypeInfo) + .name(aggOpName) + .asInstanceOf[DataStream[Row]] + } // global non-partitioned aggregation + else { +throw TableException( --- End diff -- If needed, I can do it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3547#discussion_r106219814 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -19,33 +19,124 @@ package org.apache.flink.table.runtime.aggregate import java.util -import org.apache.calcite.rel.`type`._ +import scala.collection.JavaConversions.asScalaBuffer +import scala.collection.mutable.ArrayBuffer + +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.AggregateCall +import org.apache.calcite.rex.RexInputRef +import org.apache.calcite.rex.RexLiteral +import org.apache.calcite.rex.RexWindowBound +import org.apache.calcite.sql.SqlAggFunction +import org.apache.calcite.sql.SqlKind import org.apache.calcite.sql.`type`.SqlTypeName -import org.apache.calcite.sql.`type`.SqlTypeName._ -import org.apache.calcite.sql.fun._ -import org.apache.calcite.sql.{SqlAggFunction, SqlKind} -import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.calcite.sql.`type`.SqlTypeName.BIGINT +import org.apache.calcite.sql.`type`.SqlTypeName.BOOLEAN +import org.apache.calcite.sql.`type`.SqlTypeName.DECIMAL +import org.apache.calcite.sql.`type`.SqlTypeName.DOUBLE +import org.apache.calcite.sql.`type`.SqlTypeName.FLOAT +import org.apache.calcite.sql.`type`.SqlTypeName.INTEGER +import org.apache.calcite.sql.`type`.SqlTypeName.SMALLINT +import org.apache.calcite.sql.`type`.SqlTypeName.TINYINT --- End diff -- sure --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3547#discussion_r106219673 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverWindowFunction.scala --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.types.Row +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.util.Collector +import org.apache.flink.table.functions.AggregateFunction +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.util.Preconditions +import org.apache.flink.table.functions.Accumulator +import java.lang.Iterable + +class BoundedProcessingOverWindowFunction[W <: Window]( --- End diff -- @sunjincheng121 thanks for the suggestion. I decided to use Window because it is convenient in the row bounded case. Within the window I apply the incremental aggregation in the same way. It is not clear to me what are the flexibility advantages in this specific case. Can you be more explicit? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
GitHub user huawei-flink opened a pull request: https://github.com/apache/flink/pull/3547 [FLINK-5653] Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ x ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/huawei-flink/flink FLINK-5653b Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3547.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3547 commit 0e33399e145154b2dd68e0539b65a1baaba512bd Author: Stefano Bortoli <s.bort...@gmail.com> Date: 2017-03-15T12:25:54Z First implementation of aggregation over procTime row bounded window commit 77a7eff006a1a4aaec2f785994a716bcd9c84133 Author: Stefano Bortoli <s.bort...@gmail.com> Date: 2017-03-15T12:25:54Z First implementation of aggregation over procTime row bounded window commit ecec5527b454ae9c1c3b037103b98660729d8958 Author: Stefano Bortoli <s.bort...@gmail.com> Date: 2017-03-15T13:03:13Z Merge branch 'master' of https://github.com/huawei-flink/flink into FLINK-5653b # Conflicts: # flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala # flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user huawei-flink closed the pull request at: https://github.com/apache/flink/pull/3443 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN x PREC...
Github user huawei-flink commented on the issue: https://github.com/apache/flink/pull/3443 @fhueske, I am closing this PR and opening a new one in Scala. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3459: [FLINK-5654] Add processing time OVER RANGE BETWEEN x PRE...
Github user huawei-flink commented on the issue: https://github.com/apache/flink/pull/3459 @fhueske 3. "•The first OVER window aggregation should serve as a blueprint for future OVER window implementations." - is this a general thought or you indicate that we need to rework the code based on some specific class? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN x PREC...
Github user huawei-flink commented on the issue: https://github.com/apache/flink/pull/3443 Hi @fhueske about the squashing, what is the best strategy? I was thinking to just get a clean branch and merge my contribution there and then push it in one commit. About the scala part, is it really necessary? I am in the process of "translating" some of the utils in Java to overcome the "multiple extension" limitation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3459: [FLINK-5654] Add processing time OVER RANGE BETWEEN x PRE...
Github user huawei-flink commented on the issue: https://github.com/apache/flink/pull/3459 Hi @fhueske I will start with the minor comments: in principle those were done by mistake within the rebase... Regarding the FunctionCatalog - that was because initially we used our own implementation fro ProcTime()the file as it is now it should not be modified. - now after the rebase the proctime is used Related to the main comments: 1) scala/java - I think it is a bit restrictive and unfair to say that only scala is acceptable here for maintenance reasons given that a large part of the Flink project is written in java... Will you actually impose these restriction? 2) related to the squash commits...we can check out again a new branch and add only the modifications in one push - I guess this should be ok? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3443#discussion_r105162342 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCalcRule.scala --- @@ -46,6 +52,20 @@ class DataStreamCalcRule calc.getProgram, description) } + + override def matches(call: RelOptRuleCall): Boolean = { --- End diff -- very nice. This was a work-around, better to manage it propertly --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3443#discussion_r105162261 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala --- @@ -165,6 +165,11 @@ object FlinkRuleSets { // merge and push unions rules UnionEliminatorRule.INSTANCE, + + // aggregations over intervals should be enabled to be translated also in + //queries with LogicalWindows, not only queries with LogicalCalc + ProjectWindowTransposeRule.INSTANCE, + ProjectToWindowRule.INSTANCE, --- End diff -- ok --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3459: [FLINK-5654] Add processing time OVER RANGE BETWEEN x PRE...
Github user huawei-flink commented on the issue: https://github.com/apache/flink/pull/3459 @fhueske @wuchong @twalthr can you please take a look to see if you can merge this. Thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3459: [FLINK-5654] Add processing time OVER RANGE BETWEE...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3459#discussion_r104431103 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/DataStreamProcTimeTimeAggregate.java --- @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.logical.LogicalWindow; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; +import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; +import org.apache.flink.table.api.StreamTableEnvironment; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.calcite.FlinkTypeFactory; +import org.apache.flink.table.plan.logical.rel.util.WindowAggregateUtil; +import org.apache.flink.table.plan.nodes.datastream.DataStreamRel; +import org.apache.flink.table.plan.nodes.datastream.function.DataStreamProcTimeAggregateGlobalWindowFunction; +import org.apache.flink.table.plan.nodes.datastream.function.DataStreamProcTimeAggregateWindowFunction; +import org.apache.flink.types.Row; + +public class DataStreamProcTimeTimeAggregate extends DataStreamRelJava { --- End diff -- what annotation? I am not familiar with the class annotation of Flink. is it Internal? Public? Thanks a lot for the clarification. Or were you referring to the documentation? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3459: [FLINK-5654] Add processing time OVER RANGE BETWEE...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3459#discussion_r104430878 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/DataStreamProcTimeTimeAggregate.java --- @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.logical.LogicalWindow; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; +import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; +import org.apache.flink.table.api.StreamTableEnvironment; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.calcite.FlinkTypeFactory; +import org.apache.flink.table.plan.logical.rel.util.WindowAggregateUtil; +import org.apache.flink.table.plan.nodes.datastream.DataStreamRel; +import org.apache.flink.table.plan.nodes.datastream.function.DataStreamProcTimeAggregateGlobalWindowFunction; +import org.apache.flink.table.plan.nodes.datastream.function.DataStreamProcTimeAggregateWindowFunction; +import org.apache.flink.types.Row; + +public class DataStreamProcTimeTimeAggregate extends DataStreamRelJava { --- End diff -- I added a similar description with the other SQL related classes --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3459: [FLINK-5654] Add processing time OVER RANGE BETWEEN x PRE...
Github user huawei-flink commented on the issue: https://github.com/apache/flink/pull/3459 Related to the building failure - I see that this fails only for one particular case. I looked into the error and it is not related to my modifications as you cans see below. In fact I did not touch on the Cassandra connector which is the one failing nor I caused, I would say, any things to conflict with it. From my point of view this could be pulled in [INFO] flink-libraries SUCCESS [ 0.271 s] [INFO] flink-table SUCCESS [02:44 min] [INFO] flink-jdbc . SUCCESS [ 0.898 s] [INFO] flink-hbase SUCCESS [ 48.336 s] [INFO] flink-hcatalog . SUCCESS [ 8.864 s] [INFO] flink-metrics-jmx .. SUCCESS [ 0.487 s] [INFO] flink-connector-kafka-base . SUCCESS [ 4.050 s] [INFO] flink-connector-kafka-0.8 .. SUCCESS [ 3.325 s] [INFO] flink-connector-kafka-0.9 .. SUCCESS [ 3.302 s] [INFO] flink-connector-kafka-0.10 . SUCCESS [ 1.495 s] [INFO] flink-connector-elasticsearch-base . SUCCESS [ 5.535 s] [INFO] flink-connector-elasticsearch .. SUCCESS [01:07 min] [INFO] flink-connector-elasticsearch2 . SUCCESS [ 14.613 s] [INFO] flink-connector-rabbitmq ... SUCCESS [ 0.493 s] [INFO] flink-connector-twitter SUCCESS [ 2.241 s] [INFO] flink-connector-nifi ... SUCCESS [ 0.816 s] [INFO] flink-connector-cassandra .. FAILURE [02:15 min] [INFO] flink-connector-filesystem . SKIPPED [INFO] flink-connector-kinesis SKIPPED [INFO] flink-connector-elasticsearch5 . SKIPPED [INFO] flink-examples-streaming ... SKIPPED [INFO] flink-gelly SKIPPED [INFO] flink-gelly-scala .. SKIPPED [INFO] flink-gelly-examples ... SKIPPED [INFO] flink-python ... SKIPPED [INFO] flink-ml ... SKIPPED [INFO] flink-cep .. SKIPPED [INFO] flink-cep-scala SKIPPED [INFO] flink-scala-shell .. SKIPPED [INFO] flink-quickstart ... SKIPPED [INFO] flink-quickstart-java .. SKIPPED [INFO] flink-quickstart-scala . SKIPPED [INFO] flink-storm SKIPPED [INFO] flink-storm-examples ... SKIPPED [INFO] flink-streaming-contrib SKIPPED [INFO] flink-tweet-inputformat SKIPPED [INFO] flink-connector-wikiedits .. SKIPPED [INFO] flink-mesos SKIPPED [INFO] flink-yarn . SKIPPED [INFO] flink-metrics-dropwizard ... SKIPPED [INFO] flink-metrics-ganglia .. SKIPPED [INFO] flink-metrics-graphite . SKIPPED [INFO] flink-metrics-statsd ... SKIPPED [INFO] flink-dist . SKIPPED [INFO] flink-fs-tests . SKIPPED [INFO] flink-yarn-tests ... SKIPPED [INFO] [INFO] BUILD FAILURE [INFO] [INFO] Total time: 25:31 min [INFO] Finished at: 2017-03-06T12:07:47+00:00 [INFO] Final Memory: 161M/493M [INFO] [ERROR] Failed to execute goal org.apache.maven.plugins:maven-surefire-plugin:2.18.1:test (integration-tests) on project flink-connector-cassandra_2.10: There are test failures. [ERROR] [ERROR] Please refer to /home/travis/build/apache/flink/flink-connectors/flink-connector-cassandra/target/surefire-reports for the individual test results. [ERROR] -> [Help 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch. [ERROR] Re-run Maven using the -X switch to enable full debug logging. [ERROR] [ERROR] For m
[GitHub] flink pull request #3459: [FLINK-5654] Add processing time OVER RANGE BETWEE...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3459#discussion_r104429058 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/aggs/StreamAggregator.java --- @@ -0,0 +1,23 @@ +package org.apache.flink.table.plan.nodes.datastream.aggs; --- End diff -- I added this before...indeed it was failing the RAT --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3459: [FLINK-5654] Add processing time OVER RANGE BETWEE...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3459#discussion_r104428922 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/DataStreamProcTimeTimeAggregate.java --- @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.logical.LogicalWindow; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; +import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; +import org.apache.flink.table.api.StreamTableEnvironment; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.calcite.FlinkTypeFactory; +import org.apache.flink.table.plan.logical.rel.util.WindowAggregateUtil; +import org.apache.flink.table.plan.nodes.datastream.DataStreamRel; +import org.apache.flink.table.plan.nodes.datastream.function.DataStreamProcTimeAggregateGlobalWindowFunction; +import org.apache.flink.table.plan.nodes.datastream.function.DataStreamProcTimeAggregateWindowFunction; +import org.apache.flink.types.Row; + +public class DataStreamProcTimeTimeAggregate extends DataStreamRelJava { + + private LogicalWindow windowReference; + private String description; + + public DataStreamProcTimeTimeAggregate(RelOptCluster cluster, RelTraitSet traitSet, RelNode input, + RelDataType rowType, String description, LogicalWindow windowReference) { + super(cluster, traitSet, input); + + this.rowType = rowType; + this.description = description; + this.windowReference = windowReference; + + } + + @Override + protected RelDataType deriveRowType() { + // TODO Auto-generated method stub + return super.deriveRowType(); + } + + @Override + public RelNode copy(RelTraitSet traitSet, java.util.List inputs) { + + if (inputs.size() != 1) { + System.err.println(this.getClass().getName() + " : Input size must be one!"); + } + + return new DataStreamProcTimeTimeAggregate(getCluster(), traitSet, inputs.get(0), getRowType(), + getDescription(), windowReference); + + } + + @Override + public DataStream translateToPlan(StreamTableEnvironment tableEnv) { + + // Get the general parameters related to the datastream, inputs, result + TableConfig config = tableEnv.getConfig(); + + DataStream inputDataStream = ((DataStreamRel) getInput()).translateToPlan(tableEnv); + + TypeInformation[] rowType = new TypeInformation[getRowType().getFieldList().size()]; --- End diff -- Not sure how you suggest to make the checks? Is there a style in flink for doing this? Also related to the suggestion of using a variable for getRowType().getFieldList() - I will do it, but in theory modern day compilers such as JAVA should identify and optimize this kind of things. And therefore I would expect the resu
[GitHub] flink pull request #3459: [FLINK-5654] Add processing time OVER RANGE BETWEE...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3459#discussion_r104366556 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/DataStreamProcTimeTimeAggregate.java --- @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.logical.LogicalWindow; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; +import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; +import org.apache.flink.table.api.StreamTableEnvironment; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.calcite.FlinkTypeFactory; +import org.apache.flink.table.plan.logical.rel.util.WindowAggregateUtil; +import org.apache.flink.table.plan.nodes.datastream.DataStreamRel; +import org.apache.flink.table.plan.nodes.datastream.function.DataStreamProcTimeAggregateGlobalWindowFunction; +import org.apache.flink.table.plan.nodes.datastream.function.DataStreamProcTimeAggregateWindowFunction; +import org.apache.flink.types.Row; + +public class DataStreamProcTimeTimeAggregate extends DataStreamRelJava { + + private LogicalWindow windowReference; + private String description; --- End diff -- @shijinkui - thanks for the comment I kept description because all other sql operator implementation have it. It was more for uniformity. Additionally, if at some point we decide to go for compiling functions instead of working directly against the datastream API, than this is typically used for naming. Because of these I propose to keep it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3459: [FLINK-5654] Add processing time OVER RANGE BETWEEN x PRE...
Github user huawei-flink commented on the issue: https://github.com/apache/flink/pull/3459 @shijinkui I think Radu and I complicated a bit the code pulling from each other branch some stuff. Next time we'll squash the commit. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3459: [FLINK-5654] Add processing time OVER RANGE BETWEE...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3459#discussion_r104362378 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/DataStreamProcTimeTimeAggregate.java --- @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.datastream; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.logical.LogicalWindow; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; +import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; +import org.apache.flink.table.api.StreamTableEnvironment; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.calcite.FlinkTypeFactory; +import org.apache.flink.table.plan.logical.rel.util.WindowAggregateUtil; +import org.apache.flink.table.plan.nodes.datastream.DataStreamRel; +import org.apache.flink.table.plan.nodes.datastream.function.DataStreamProcTimeAggregateGlobalWindowFunction; +import org.apache.flink.table.plan.nodes.datastream.function.DataStreamProcTimeAggregateWindowFunction; +import org.apache.flink.types.Row; + +public class DataStreamProcTimeTimeAggregate extends DataStreamRelJava { + + private LogicalWindow windowReference; + private String description; --- End diff -- @rtudoran you should have a look at this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3443#discussion_r104361048 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/ProcTimeRowStreamAggregationSqlITCase.java --- @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.java.stream.sql; + +import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.types.Row; +import org.apache.flink.table.api.scala.stream.utils.StreamITCase; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.apache.flink.table.api.java.stream.utils.StreamTestData; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +public class ProcTimeRowStreamAggregationSqlITCase extends StreamingMultipleProgramsTestBase { + + + @Test + public void testMaxAggregatation() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); + StreamITCase.clear(); + + env.setParallelism(1); + + DataStream<Tuple5<Integer, Long, Integer, String, Long>> ds = StreamTestData.get5TupleDataStream(env); + Table in = tableEnv.fromDataStream(ds, "a,b,c,d,e"); + tableEnv.registerTable("MyTable", in); + + String sqlQuery = "SELECT a, MAX(c) OVER (PARTITION BY a ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS maxC FROM MyTable"; + Table result = tableEnv.sql(sqlQuery); + + DataStream resultSet = tableEnv.toDataStream(result, Row.class); + resultSet.addSink(new StreamITCase.StringSink()); + env.execute(); + + List expected = new ArrayList<>(); + expected.add("1,0"); + expected.add("2,1"); + expected.add("2,2"); + expected.add("3,3"); + expected.add("3,4"); + expected.add("3,5"); + expected.add("4,6"); + expected.add("4,7"); + expected.add("4,8"); + expected.add("4,9"); + expected.add("5,10"); + expected.add("5,11"); + expected.add("5,12"); + expected.add("5,14"); + expected.add("5,14"); + + StreamITCase.compareWithList(expected); + } + + @Test + public void testMinAggregatation() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); + StreamITCase.clear(); + + env.setParallelism(1); + + DataStream<Tuple5<Integer, Long, Integer, String, Long>> ds = StreamTestData.get5TupleDataStream(env); + Table in = tableEnv.fromDataStream(ds, "a,b,c,d,e"); + tableEnv.registerTable("MyTable", in); + + String sqlQuery = "SELECT a, MIN(c) OVER (PARTITION BY a ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS maxC FROM MyTable"; + Table result = tableEnv.sql(sqlQuery); + +
[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3443#discussion_r104148648 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/aggs/DoubleSummaryAggregation.java --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.nodes.datastream.aggs; + +import static org.apache.flink.api.java.summarize.aggregation.CompensatedSum.ZERO; + +import org.apache.flink.api.java.summarize.aggregation.Aggregator; +import org.apache.flink.api.java.summarize.aggregation.CompensatedSum; +import org.apache.flink.api.java.summarize.aggregation.NumericSummaryAggregator; + +public class DoubleSummaryAggregation extends NumericSummaryAggregator { --- End diff -- That is fine. I have pulled the new aggregation functions, and updated the PR without the reset method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3459: [FLINK-5654] Add processing time OVER RANGE BETWEE...
GitHub user huawei-flink opened a pull request: https://github.com/apache/flink/pull/3459 [FLINK-5654] Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ X] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [X ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/huawei-flink/flink FLINK-5654 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3459.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3459 commit 72ec35a7380a4d73bd092ce14962ab2248139bae Author: Stefano Bortoli <s.bort...@gmail.com> Date: 2017-02-01T16:15:58Z First implementation of ProcTime() commit e98c28616af1cf67d3ad3277d9cc2ca335604eca Author: rtudoran <rtudoran@bigdata-hp3> Date: 2017-02-02T10:30:40Z Disambiguate for the OVER BY clause, which should not be treated as a RexOver expression in Logical Project commit b7e6a673b88b6181c06071cd6c7bda55c25a62b4 Author: Stefano Bortoli <s.bort...@gmail.com> Date: 2017-02-02T12:07:11Z Added return to disambiguation method for rexover commit cda17565d5969f29b16923b631178a2cbf64791b Author: rtudoran <rtudoran@bigdata-hp3> Date: 2017-02-02T16:00:20Z Enable the LogicalWindow operators in query translation commit 4b3e54281018b83c818f91e09a5321c34bbf297b Author: rtudoran <rtudoran@bigdata-hp3> Date: 2017-02-03T14:59:39Z Added a DataStreamRel version that can be extended in java commit cc960d699db369cc8dc4e155cc5c5f6c3baf74a4 Author: rtudoran <rtudoran@bigdata-hp3> Date: 2017-02-03T15:35:18Z Add skeleton for the implementation of the aggregates over sliding window with processing time and time boundaries commit 2390a9d3dc15afba01185c47f61a9ea830ea5acc Author: Stefano Bortoli <s.bort...@gmail.com> Date: 2017-02-06T10:33:57Z committing changes with stub modifications before chekout proctime branch commit eaf4e92784dab01b17004390968ca4b1fe7c4bea Author: Stefano Bortoli <s.bort...@gmail.com> Date: 2017-02-06T13:17:43Z ignore aggregation test and implemented simple proctime test commit 16ccd7f5bf019803ea8b53f09a126ec53a5a6d59 Author: Stefano Bortoli <s.bort...@gmail.com> Date: 2017-02-06T14:17:03Z Merge branch 'FLINK-5710' of https://github.com/huawei-flink/flink into FLINK-5653 commit 10f7bc5e2086e41ec76cbabdcd069c71a491671a Author: Stefano Bortoli <s.bort...@gmail.com> Date: 2017-02-07T09:42:41Z committing first key selector and utils commit 31060e46f78729880c03e8cab0f92ff06faec4f0 Author: Stefano Bortoli <s.bort...@gmail.com> Date: 2017-02-07T11:16:43Z Changed ProcTime from time to timestamp commit 69289bad836a5fdace271b28a15ca0e309e50b17 Author: rtudoran <tudoranr...@ymail.com> Date: 2017-02-07T13:13:23Z Merge branch 'FLINK-5710' of https://github.com/huawei-flink/flink into FLINK-5654 commit 3392817045ed166df5f55d22fde34cbd98c775db Author: rtudoran <tudoranr...@ymail.com> Date: 2017-02-07T13:14:50Z Merge branch 'FLINK-5653' of https://github.com/huawei-flink/flink into FLINK-5654 commit d2ea0076b5e3561585c4eaea84025e50beaacf9a Author: Stefano Bortoli <s.bort...@gmail.com> Date: 2017-02-07T09:42:41Z fixing linelength and other issues commit f29f564bb7fe7496b9f3d2f45a6b4469af559378 Author: Stefano Bortoli <s.bort...@gmail.com> Date: 2017-02-07T13:46:30Z Merge branch 'FLINK-5653' of https://github.com/huawei-flink/flink.git into FLINK-5653 Conflicts: flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/logical/rel/DataStreamWindowRowAggregate.java flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/logical/rel/util
[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3443#discussion_r103925068 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/aggs/DoubleSummaryAggregation.java --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.nodes.datastream.aggs; + +import static org.apache.flink.api.java.summarize.aggregation.CompensatedSum.ZERO; + +import org.apache.flink.api.java.summarize.aggregation.Aggregator; +import org.apache.flink.api.java.summarize.aggregation.CompensatedSum; +import org.apache.flink.api.java.summarize.aggregation.NumericSummaryAggregator; + +public class DoubleSummaryAggregation extends NumericSummaryAggregator { --- End diff -- Indeed, the StreamAggregator interface I defined has a reset and evict method, to support associative function aggregation. Anyway, no point in discussing it. I will push my proposal and then you see :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3443#discussion_r103920777 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/aggs/DoubleSummaryAggregation.java --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.nodes.datastream.aggs; + +import static org.apache.flink.api.java.summarize.aggregation.CompensatedSum.ZERO; + +import org.apache.flink.api.java.summarize.aggregation.Aggregator; +import org.apache.flink.api.java.summarize.aggregation.CompensatedSum; +import org.apache.flink.api.java.summarize.aggregation.NumericSummaryAggregator; + +public class DoubleSummaryAggregation extends NumericSummaryAggregator { --- End diff -- I see that the AggregateFunction interface does not have a reset method if not the createAccumulator(), this means that we have to create an accumulator for every apply execution. With the reset you just assign the value to the starting point, without creating a new object all time. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3443#discussion_r103919690 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/aggs/DoubleSummaryAggregation.java --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.nodes.datastream.aggs; + +import static org.apache.flink.api.java.summarize.aggregation.CompensatedSum.ZERO; + +import org.apache.flink.api.java.summarize.aggregation.Aggregator; +import org.apache.flink.api.java.summarize.aggregation.CompensatedSum; +import org.apache.flink.api.java.summarize.aggregation.NumericSummaryAggregator; + +public class DoubleSummaryAggregation extends NumericSummaryAggregator { --- End diff -- ok, so I will change my code to use the AggregateFunction you pointed out --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3443#discussion_r103914685 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/aggs/DoubleSummaryAggregation.java --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.nodes.datastream.aggs; + +import static org.apache.flink.api.java.summarize.aggregation.CompensatedSum.ZERO; + +import org.apache.flink.api.java.summarize.aggregation.Aggregator; +import org.apache.flink.api.java.summarize.aggregation.CompensatedSum; +import org.apache.flink.api.java.summarize.aggregation.NumericSummaryAggregator; + +public class DoubleSummaryAggregation extends NumericSummaryAggregator { --- End diff -- Do you think this switch on aggregation could be done in another PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3443#discussion_r103887096 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/aggs/DoubleSummaryAggregation.java --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.nodes.datastream.aggs; + +import static org.apache.flink.api.java.summarize.aggregation.CompensatedSum.ZERO; + +import org.apache.flink.api.java.summarize.aggregation.Aggregator; +import org.apache.flink.api.java.summarize.aggregation.CompensatedSum; +import org.apache.flink.api.java.summarize.aggregation.NumericSummaryAggregator; + +public class DoubleSummaryAggregation extends NumericSummaryAggregator { --- End diff -- The aggregate functions in the org.apache.flink.table.runtime.aggregate.* assume a GroupReduce, whereas I have implemented it as a flatmap. Should I switch my implementation to reduce? @fhueske what do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3443#discussion_r103866134 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/aggs/DoubleSummaryAggregation.java --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.nodes.datastream.aggs; + +import static org.apache.flink.api.java.summarize.aggregation.CompensatedSum.ZERO; + +import org.apache.flink.api.java.summarize.aggregation.Aggregator; +import org.apache.flink.api.java.summarize.aggregation.CompensatedSum; +import org.apache.flink.api.java.summarize.aggregation.NumericSummaryAggregator; + +public class DoubleSummaryAggregation extends NumericSummaryAggregator { --- End diff -- Good point. I searched a while, but could not find something fit, then I decided to create stream specific. The idea was to have something that could be stream optimized eventually. However, I will try to reuse existing one. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3443#discussion_r103866164 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/ProcTimeRowStreamAggregationSqlITCase.java --- @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.java.stream.sql; + +import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.types.Row; +import org.apache.flink.table.api.scala.stream.utils.StreamITCase; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.apache.flink.table.api.java.stream.utils.StreamTestData; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +public class ProcTimeRowStreamAggregationSqlITCase extends StreamingMultipleProgramsTestBase { + + + @Test + public void testMaxAggregatation() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); + StreamITCase.clear(); + + env.setParallelism(1); + + DataStream<Tuple5<Integer, Long, Integer, String, Long>> ds = StreamTestData.get5TupleDataStream(env); + Table in = tableEnv.fromDataStream(ds, "a,b,c,d,e"); + tableEnv.registerTable("MyTable", in); + + String sqlQuery = "SELECT a, MAX(c) OVER (PARTITION BY a ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS maxC FROM MyTable"; + Table result = tableEnv.sql(sqlQuery); + + DataStream resultSet = tableEnv.toDataStream(result, Row.class); + resultSet.addSink(new StreamITCase.StringSink()); + env.execute(); + + List expected = new ArrayList<>(); + expected.add("1,0"); + expected.add("2,1"); + expected.add("2,2"); + expected.add("3,3"); + expected.add("3,4"); + expected.add("3,5"); + expected.add("4,6"); + expected.add("4,7"); + expected.add("4,8"); + expected.add("4,9"); + expected.add("5,10"); + expected.add("5,11"); + expected.add("5,12"); + expected.add("5,14"); + expected.add("5,14"); + + StreamITCase.compareWithList(expected); + } + + @Test + public void testMinAggregatation() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); + StreamITCase.clear(); + + env.setParallelism(1); + + DataStream<Tuple5<Integer, Long, Integer, String, Long>> ds = StreamTestData.get5TupleDataStream(env); + Table in = tableEnv.fromDataStream(ds, "a,b,c,d,e"); + tableEnv.registerTable("MyTable", in); + + String sqlQuery = "SELECT a, MIN(c) OVER (PARTITION BY a ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS maxC FROM MyTable"; + Table result = tableEnv.sql(sqlQuery); + +
[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3443#discussion_r103865652 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/logical/rel/util/WindowAggregateUtil.java --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.logical.rel.util; + +import java.io.Serializable; +import java.util.List; + +import org.apache.calcite.rel.core.Window.Group; +import org.apache.calcite.rel.logical.LogicalWindow; +import org.apache.calcite.rex.RexLiteral; + +import com.google.common.collect.ImmutableList; + +public class WindowAggregateUtil implements Serializable { + + private static final long serialVersionUID = -3916551736243544540L; + + private LogicalWindow windowPointer = null; + + public WindowAggregateUtil() { + + } + + public WindowAggregateUtil(LogicalWindow window) { + this.windowPointer = window; + + } + + /** +* A utility function that checks whether a window is partitioned or it is a +* global window. +* +* @param LogicalWindow +*window to be checked for partitions +* @return true if partition keys are defined, false otherwise. +*/ + public boolean isStreamPartitioned(LogicalWindow window) { + // if it exists a group bounded by keys, the it is + // a partitioned window + for (Group group : window.groups) { + if (!group.keys.isEmpty()) { + return true; + } + } + + return false; + } + + public int[] getKeysAsArray(Group group) { + if (group == null) { + return null; + } + return group.keys.toArray(); + } + + /** +* This method returns the [[int]] lowerbound of a window when expressed +* with an integer e.g. ... ROWS BETWEEN [[value]] PRECEDING AND CURRENT ROW +* +* @param constants +*the list of constant to get the offset value +* @return return the value of the lowerbound if available -1 otherwise +*/ + + public int getLowerBoundary(ImmutableList constants) { + return ((Long)constants.get(0).getValue2()).intValue(); --- End diff -- Thank you very much. I was indeed puzzled by this. I will fix it according to your suggestion. I even tried to get to the Calcite mailing list, but nothing like this came out. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...
GitHub user huawei-flink opened a pull request: https://github.com/apache/flink/pull/3443 [FLINK-5653] Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [X] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [X] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [X] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/huawei-flink/flink FLINK-5653 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3443.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3443 commit 72ec35a7380a4d73bd092ce14962ab2248139bae Author: Stefano Bortoli <s.bort...@gmail.com> Date: 2017-02-01T16:15:58Z First implementation of ProcTime() commit e98c28616af1cf67d3ad3277d9cc2ca335604eca Author: rtudoran <rtudoran@bigdata-hp3> Date: 2017-02-02T10:30:40Z Disambiguate for the OVER BY clause, which should not be treated as a RexOver expression in Logical Project commit b7e6a673b88b6181c06071cd6c7bda55c25a62b4 Author: Stefano Bortoli <s.bort...@gmail.com> Date: 2017-02-02T12:07:11Z Added return to disambiguation method for rexover commit cda17565d5969f29b16923b631178a2cbf64791b Author: rtudoran <rtudoran@bigdata-hp3> Date: 2017-02-02T16:00:20Z Enable the LogicalWindow operators in query translation commit 4b3e54281018b83c818f91e09a5321c34bbf297b Author: rtudoran <rtudoran@bigdata-hp3> Date: 2017-02-03T14:59:39Z Added a DataStreamRel version that can be extended in java commit cc960d699db369cc8dc4e155cc5c5f6c3baf74a4 Author: rtudoran <rtudoran@bigdata-hp3> Date: 2017-02-03T15:35:18Z Add skeleton for the implementation of the aggregates over sliding window with processing time and time boundaries commit 2390a9d3dc15afba01185c47f61a9ea830ea5acc Author: Stefano Bortoli <s.bort...@gmail.com> Date: 2017-02-06T10:33:57Z committing changes with stub modifications before chekout proctime branch commit eaf4e92784dab01b17004390968ca4b1fe7c4bea Author: Stefano Bortoli <s.bort...@gmail.com> Date: 2017-02-06T13:17:43Z ignore aggregation test and implemented simple proctime test commit 16ccd7f5bf019803ea8b53f09a126ec53a5a6d59 Author: Stefano Bortoli <s.bort...@gmail.com> Date: 2017-02-06T14:17:03Z Merge branch 'FLINK-5710' of https://github.com/huawei-flink/flink into FLINK-5653 commit 10f7bc5e2086e41ec76cbabdcd069c71a491671a Author: Stefano Bortoli <s.bort...@gmail.com> Date: 2017-02-07T09:42:41Z committing first key selector and utils commit 31060e46f78729880c03e8cab0f92ff06faec4f0 Author: Stefano Bortoli <s.bort...@gmail.com> Date: 2017-02-07T11:16:43Z Changed ProcTime from time to timestamp commit 69289bad836a5fdace271b28a15ca0e309e50b17 Author: rtudoran <tudoranr...@ymail.com> Date: 2017-02-07T13:13:23Z Merge branch 'FLINK-5710' of https://github.com/huawei-flink/flink into FLINK-5654 commit 3392817045ed166df5f55d22fde34cbd98c775db Author: rtudoran <tudoranr...@ymail.com> Date: 2017-02-07T13:14:50Z Merge branch 'FLINK-5653' of https://github.com/huawei-flink/flink into FLINK-5654 commit d2ea0076b5e3561585c4eaea84025e50beaacf9a Author: Stefano Bortoli <s.bort...@gmail.com> Date: 2017-02-07T09:42:41Z fixing linelength and other issues commit f29f564bb7fe7496b9f3d2f45a6b4469af559378 Author: Stefano Bortoli <s.bort...@gmail.com> Date: 2017-02-07T13:46:30Z Merge branch 'FLINK-5653' of https://github.com/huawei-flink/flink.git into FLINK-5653 Conflicts: flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/logical/rel/DataStreamWindowRowAggregate.java flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/logical/rel/util
[GitHub] flink issue #3302: [FLINK-5710] Add ProcTime() function to indicate StreamSQ...
Github user huawei-flink commented on the issue: https://github.com/apache/flink/pull/3302 @fhueske no problem, I understand. It was bad timing, as I was on the run for more than two weeks, with little time to follow this. We'll contribute on other issues. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3370: [FLINK-5710] Add ProcTime() function to indicate StreamSQ...
Github user huawei-flink commented on the issue: https://github.com/apache/flink/pull/3370 @haohui thanks for the contribution. I merged your code, I will push it later today. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3302: [FLINK-5710] Add ProcTime() function to indicate StreamSQ...
Github user huawei-flink commented on the issue: https://github.com/apache/flink/pull/3302 I managed to merge the changes from PR #3370 into my branch after rebase, and the test works. I will push the code later today. sorry if I am a little latent, but I am travelling and connectivity is sometimes a problem. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3302: [FLINK-5710] Add ProcTime() function to indicate StreamSQ...
Github user huawei-flink commented on the issue: https://github.com/apache/flink/pull/3302 Hi Fabian, I will follow up in the next days, probably early next week. is it ok? Stefano --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3302: [FLINK-5710] Add ProcTime() function to indicate StreamSQ...
Github user huawei-flink commented on the issue: https://github.com/apache/flink/pull/3302 @fhueske I've addressed most of the points, however there is a thing that is not clear to me yet. So far, the procTime() function generates a timestamp. My understanding is that this is not correct, and it should be something else. could it be a default timestamp (e.g. epoch)? the actual timestamp normalized to the second? what is the best option in your opinion? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3271: [FLINK-5710] Add ProcTime() function to indicate S...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3271#discussion_r101455204 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/calcite/functions/FlinkStreamFunctionCatalog.java --- @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.calcite.functions; + +import org.apache.calcite.sql.SqlFunction; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.type.OperandTypes; +import org.apache.calcite.sql.type.ReturnTypes; +import org.apache.calcite.sql.type.SqlReturnTypeInference; +import org.apache.calcite.sql.type.SqlTypeName; + +public class FlinkStreamFunctionCatalog { + + /** +* An explicit representation of TIMESTAMP as an SQL return type +*/ + private static final SqlReturnTypeInference TIMESTAMP = ReturnTypes.explicit(SqlTypeName.TIMESTAMP, 0); + + /** +* A a parameterless scalar function that just indicates processing time mode. +*/ + public static final SqlFunction PROCTIME = new SqlFunction("PROCTIME", SqlKind.OTHER_FUNCTION, TIMESTAMP, null, OperandTypes.NILADIC, SqlFunctionCategory.TIMEDATE); --- End diff -- should I simply create the same class with using PROCTIME in place of ROWTIME? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3271: [FLINK-5710] Add ProcTime() function to indicate S...
Github user huawei-flink commented on a diff in the pull request: https://github.com/apache/flink/pull/3271#discussion_r101454598 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ProcTimeCallGen.scala --- @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.codegen.calls + +import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation} +import org.apache.flink.table.codegen.{CodeGenerator, GeneratedExpression} + +/** + * Generates function call to determine current time point (as date/time/timestamp) in --- End diff -- not sure I understand this point. Should the generate method throw an exception, or should the code for the exception be generated? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3302: [FLINK-5710] Add ProcTime() function to indicate S...
GitHub user huawei-flink opened a pull request: https://github.com/apache/flink/pull/3302 [FLINK-5710] Add ProcTime() function to indicate StreamSQL Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/huawei-flink/flink FLINK-5710 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3302.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3302 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3271: FLINK-5710
Github user huawei-flink commented on the issue: https://github.com/apache/flink/pull/3271 @fhueske not sure this was noticed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3271: Flink 5710
GitHub user huawei-flink opened a pull request: https://github.com/apache/flink/pull/3271 Flink 5710 Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [ ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/huawei-flink/flink FLINK-5710 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3271.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3271 commit 72ec35a7380a4d73bd092ce14962ab2248139bae Author: Stefano Bortoli <s.bort...@gmail.com> Date: 2017-02-01T16:15:58Z First implementation of ProcTime() commit eaf4e92784dab01b17004390968ca4b1fe7c4bea Author: Stefano Bortoli <s.bort...@gmail.com> Date: 2017-02-06T13:17:43Z ignore aggregation test and implemented simple proctime test --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---