[GitHub] flink issue #3783: [FLINK-6388] Add support for DISTINCT into Code Generated...

2017-05-11 Thread huawei-flink
Github user huawei-flink commented on the issue:

https://github.com/apache/flink/pull/3783
  
@fhueske , this is the PR with the code generated distinct aggregation for 
OVER. You mentioned that the value of the aggregation should be a Row, but what 
is kept in the distinct state is just the event value, not its "aggregation 
value state". Perhaps you can try to explain it better to me so that I can 
complete this PR and we can move on. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3783: [FLINK-6388] Add support for DISTINCT into Code Generated...

2017-04-28 Thread huawei-flink
Github user huawei-flink commented on the issue:

https://github.com/apache/flink/pull/3783
  
@rtudoran @fhueske the first implementation I made was with the state in 
the ProcessFunction without code generated aggregation function. Second, I 
pushed a branch with the state in the process function using the code generated 
process function. Then, third I moved the state within the code generated 
function. 

It is not clear to me why the state cannot be within the code generated 
function. Could you please clarify so that we can understand whether it is 
worth working around it. This feature is quite important for us.

Anyway, you could have a look at the branch that uses the state in the 
process function and uses the code generated aggregation functions. Basically, 
rather than generate one code generated function for all the aggregations, I 
create one class for each, and then I call the corresponding accumulate/retract 
using the distinct logic when marked in the process function. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3771: [FLINK-6250] Distinct procTime with Rows boundaries

2017-04-26 Thread huawei-flink
Github user huawei-flink commented on the issue:

https://github.com/apache/flink/pull/3771
  
@fhueske I have created #3783 with just the code generation part. At least 
the GROUP BY distinct can move ahead. I will close this PR and wait for the 
merging of the Calcite fix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3783: [FLINK-6338] Add support for DISTINCT into Code Generated...

2017-04-26 Thread huawei-flink
Github user huawei-flink commented on the issue:

https://github.com/apache/flink/pull/3783
  
@fhueske please have a look at this PR, it contains just the code 
generation part with optional distinct.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3783: [FLINK-6338] Add support for DISTINCT into Code Ge...

2017-04-26 Thread huawei-flink
GitHub user huawei-flink opened a pull request:

https://github.com/apache/flink/pull/3783

[FLINK-6338] Add support for DISTINCT into Code Generated Aggregations

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/stefanobortoli/flink FLINK-6338

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3783.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3783


commit 55e0a8d38187bef22b6135db7f4a5c1cc8f15811
Author: Stefano Bortoli <s.bort...@gmail.com>
Date:   2017-04-26T17:22:04Z

Added code generation distinct aggregation logic




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3771: [FLINK-6250] Distinct procTime with Rows boundaries

2017-04-26 Thread huawei-flink
Github user huawei-flink commented on the issue:

https://github.com/apache/flink/pull/3771
  
So, what do you want me to keep for this PR? just the code generation and 
its test?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3771: [FLINK-6250] Distinct procTime with Rows boundarie...

2017-04-26 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3771#discussion_r113472166
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala
 ---
@@ -72,7 +73,15 @@ class ProcTimeBoundedRowsOver(
   genAggregations.code)
 LOG.debug("Instantiating AggregateHelper.")
 function = clazz.newInstance()
-
+
+var initialized = false
+for(i <- distinctAggFlags.indices){
+  if(distinctAggFlags(i) && !initialized){
+function.initialize(getRuntimeContext())
--- End diff --

right!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3771: [FLINK-6250] Distinct procTime with Rows boundarie...

2017-04-26 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3771#discussion_r113472029
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
 ---
@@ -20,16 +20,22 @@ package org.apache.flink.table.runtime.aggregate
 
 import org.apache.flink.api.common.functions.Function
 import org.apache.flink.types.Row
+import org.apache.flink.api.common.functions.RuntimeContext
 
 /**
   * Base class for code-generated aggregations.
   */
 abstract class GeneratedAggregations extends Function {
+  
+  /**
+* Initialize the state for the distinct aggregation check
+*
+* @param ctx the runtime context to retrieve and initialize the 
distinct states
+*/
+  def initialize(ctx: RuntimeContext)
 
   /**
-* Sets the results of the aggregations (partial or final) to the 
output row.
--- End diff --

probably an error in the merging. sorry about that


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3771: [FLINK-6250] Distinct procTime with Rows boundarie...

2017-04-26 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3771#discussion_r113471605
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -335,14 +371,28 @@ class CodeGenerator(
 j"""
 |  public final void accumulate(
 |org.apache.flink.types.Row accs,
-|org.apache.flink.types.Row input)""".stripMargin
+|org.apache.flink.types.Row input) throws 
Exception""".stripMargin
 
   val accumulate: String = {
 for (i <- aggs.indices) yield
-  j"""
+ if(distinctAggsFlags(i)){
+   j"""
+  |  Long distValCount$i = (Long) 
distStateList[$i].get(${parameters(i)});
+  |  if( distValCount$i == null){
--- End diff --

In scala it is 0L, is Java it is null. :-/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3771: [FLINK-6250] Distinct procTime with Rows boundarie...

2017-04-26 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3771#discussion_r113471045
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -296,6 +297,41 @@ class CodeGenerator(
   fields.mkString(", ")
 }
 
+def genInitialize(existDistinct : Boolean): String = {
+  
+  val sig: String = 
+j"""
+   | org.apache.flink.api.common.state.MapState[] distStateList =
+   |   new org.apache.flink.api.common.state.MapState[ 
${distinctAggsFlags.size} ];
+   | 
+   |  public void initialize(
+   |org.apache.flink.api.common.functions.RuntimeContext ctx
+   |  )""".stripMargin
+  if(existDistinct){
--- End diff --

you are right


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3771: [FLINK-6250] Distinct procTime with Rows boundaries

2017-04-26 Thread huawei-flink
Github user huawei-flink commented on the issue:

https://github.com/apache/flink/pull/3771
  
@fhueske @haohui I have no problem removing the DIST() part, it is just not 
possible to test it without. Shall I push just the code generation and 
aggregates util changes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...

2017-04-26 Thread huawei-flink
Github user huawei-flink closed the pull request at:

https://github.com/apache/flink/pull/3732


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...

2017-04-19 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3732#discussion_r112210389
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
 ---
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+class ProcTimeBoundedDistinctRowsOver(
+  private val aggregates: Array[AggregateFunction[_]],
+  private val aggFields: Array[Array[Int]],
+  private val distinctAggsFlag: Array[Boolean],
+  private val precedingOffset: Long,
+  private val forwardedFieldCount: Int,
+  private val aggregatesTypeInfo: RowTypeInfo,
+  private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkNotNull(distinctAggsFlag)
+  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkArgument(precedingOffset > 0)
+
+  private var accumulatorState: ValueState[Row] = _
+  private var rowMapState: MapState[Long, JList[Row]] = _
+  private var output: Row = _
+  private var counterState: ValueState[Long] = _
+  private var smallestTsState: ValueState[Long] = _
+  private var distinctValueState: MapState[Any, Row] = _
+
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState = 
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
+smallestTsState = 
getRuntimeContext.getState(smallestTimestampDescriptor)
+
+val distinctValDescriptor : MapStateDescriptor[Any, Row] =
+  new MapStateDescriptor[Any, Row]("distinctValuesBu

[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...

2017-04-19 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3732#discussion_r112210097
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
 ---
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+class ProcTimeBoundedDistinctRowsOver(
+  private val aggregates: Array[AggregateFunction[_]],
+  private val aggFields: Array[Array[Int]],
+  private val distinctAggsFlag: Array[Boolean],
+  private val precedingOffset: Long,
+  private val forwardedFieldCount: Int,
+  private val aggregatesTypeInfo: RowTypeInfo,
+  private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkNotNull(distinctAggsFlag)
+  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkArgument(precedingOffset > 0)
+
+  private var accumulatorState: ValueState[Row] = _
+  private var rowMapState: MapState[Long, JList[Row]] = _
+  private var output: Row = _
+  private var counterState: ValueState[Long] = _
+  private var smallestTsState: ValueState[Long] = _
+  private var distinctValueState: MapState[Any, Row] = _
+
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState = 
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
+smallestTsState = 
getRuntimeContext.getState(smallestTimestampDescriptor)
+
+val distinctValDescriptor : MapStateDescriptor[Any, Row] =
+  new MapStateDescriptor[Any, Row]("disti

[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...

2017-04-19 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3732#discussion_r112180091
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
 ---
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+class ProcTimeBoundedDistinctRowsOver(
+  private val aggregates: Array[AggregateFunction[_]],
+  private val aggFields: Array[Array[Int]],
+  private val distinctAggsFlag: Array[Boolean],
+  private val precedingOffset: Long,
+  private val forwardedFieldCount: Int,
+  private val aggregatesTypeInfo: RowTypeInfo,
+  private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkNotNull(distinctAggsFlag)
+  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkArgument(precedingOffset > 0)
+
+  private var accumulatorState: ValueState[Row] = _
+  private var rowMapState: MapState[Long, JList[Row]] = _
+  private var output: Row = _
+  private var counterState: ValueState[Long] = _
+  private var smallestTsState: ValueState[Long] = _
+  private var distinctValueState: MapState[Any, Row] = _
+
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState = 
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
+smallestTsState = 
getRuntimeContext.getState(smallestTimestampDescriptor)
+
+val distinctValDescriptor : MapStateDescriptor[Any, Row] =
+  new MapStateDescriptor[Any, Row]("disti

[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...

2017-04-19 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3732#discussion_r112151646
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
 ---
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+class ProcTimeBoundedDistinctRowsOver(
+  private val aggregates: Array[AggregateFunction[_]],
+  private val aggFields: Array[Array[Int]],
+  private val distinctAggsFlag: Array[Boolean],
+  private val precedingOffset: Long,
+  private val forwardedFieldCount: Int,
+  private val aggregatesTypeInfo: RowTypeInfo,
+  private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkNotNull(distinctAggsFlag)
+  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkArgument(precedingOffset > 0)
+
+  private var accumulatorState: ValueState[Row] = _
+  private var rowMapState: MapState[Long, JList[Row]] = _
+  private var output: Row = _
+  private var counterState: ValueState[Long] = _
+  private var smallestTsState: ValueState[Long] = _
+  private var distinctValueState: MapState[Any, Row] = _
+
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState = 
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
+smallestTsState = 
getRuntimeContext.getState(smallestTimestampDescriptor)
+
+val distinctValDescriptor : MapStateDescriptor[Any, Row] =
+  new MapStateDescriptor[Any, Row]("distinctValuesBu

[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...

2017-04-19 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3732#discussion_r112151222
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
 ---
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+class ProcTimeBoundedDistinctRowsOver(
+  private val aggregates: Array[AggregateFunction[_]],
+  private val aggFields: Array[Array[Int]],
+  private val distinctAggsFlag: Array[Boolean],
+  private val precedingOffset: Long,
+  private val forwardedFieldCount: Int,
+  private val aggregatesTypeInfo: RowTypeInfo,
+  private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkNotNull(distinctAggsFlag)
+  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkArgument(precedingOffset > 0)
+
+  private var accumulatorState: ValueState[Row] = _
+  private var rowMapState: MapState[Long, JList[Row]] = _
+  private var output: Row = _
+  private var counterState: ValueState[Long] = _
+  private var smallestTsState: ValueState[Long] = _
+  private var distinctValueState: MapState[Any, Row] = _
+
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState = 
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
+smallestTsState = 
getRuntimeContext.getState(smallestTimestampDescriptor)
+
+val distinctValDescriptor : MapStateDescriptor[Any, Row] =
+  new MapStateDescriptor[Any, Row]("distinctValuesBu

[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...

2017-04-18 Thread huawei-flink
GitHub user huawei-flink opened a pull request:

https://github.com/apache/flink/pull/3732

[FLINK-6250] Distinct procTime with Rows boundaries

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [X] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [X] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [X] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/huawei-flink/flink FLINK-6250

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3732.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3732


commit 4e3da4c9baebf48bfc47ef192287e7e17ab69efd
Author: Stefano Bortoli <s.bort...@gmail.com>
Date:   2017-04-18T12:27:26Z

DIST() for aggregation on procTime row bounded windows




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3459: [FLINK-5654] Add processing time OVER RANGE BETWEE...

2017-04-18 Thread huawei-flink
Github user huawei-flink closed the pull request at:

https://github.com/apache/flink/pull/3459


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3574: [FLINK-5653] Add processing time OVER ROWS BETWEEN x PREC...

2017-03-27 Thread huawei-flink
Github user huawei-flink commented on the issue:

https://github.com/apache/flink/pull/3574
  
@fhueske Thanks a lot of the clarification. I understand the issue better 
now, and see your attempt to make an average case that would work for both in 
memory as well as on external persistence. Considering RocksDB as the state of 
art, your choice sounds much more reasonable. We are well aware of the costs of 
serialization, and the impact is definitely important.  However, low latency 
systems with strict SLA will likely run just in memory. 

The O(n) of the MapState is granted by the fact that time is monothonic and 
therefore the sequential reading is managed by the key timestamp. The cost of 
each O(1) in the hashmap increseas with the size of the window thou as you need 
to search through the map index. We definitely need better data access patterns 
for the state of "time series" types of data. 

I will try to internalize it and provide the MapState implementation




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3574: [FLINK-5653] Add processing time OVER ROWS BETWEEN x PREC...

2017-03-27 Thread huawei-flink
Github user huawei-flink commented on the issue:

https://github.com/apache/flink/pull/3574
  
Hi @fhueske, @sunjincheng121 ,

let me try to explain my perspective on this specific case (row based, proc 
time). This is for the purpose of discussion, to show that we are spending 
thoughts on this topic for a while now.

In case of the row range, the "serialization savings" coming from MapState 
exists up to the point in which the "buffer" is filled. After that that, we 
need to start retracting to keep the value correct and to do that, we need to 
deserialize all the objects. as @rtudoran mentioned, we implemented a version 
using a Queue object.

This has many advantages:
- removing the object from the buffer at the right moment freeing memory on 
the go (without any iteration over the key set) 
- has the data access pattern of O(1) without any "key resolution costs" 
and no list iteration
- keeps the natural processing order by design, without the need of 
indexing objects with timestamps
- the experiments we run show that there are no difference for windows up 
to 100k elements, and after that the queue seems to be more efficient (as the 
the key resolution does not come for free). 

The map state may have a slight advantage in the early stages, when the 
window is not filled, but after it just introduces useless operations. 
Furthermore, the need to index objects with a created timestamp (more memory 
wasted), dealing with a sequential access (List) to get the most recent object 
when you can actually just use the natural arrival order seems useless 
complication. Applying the Occam Razor there should be no doubt on which 
solution we should be selecting first. The serialization optimization while 
window gets filled sounds like a premature optimization not worth in the long 
run. The further implementation of SQL operators (e.g. LIMIT, OFFSET etc)  can 
just benefit from the fact that the state is already sorted, whereas the map 
would need to be sorted all the time. 

Of course I am talking specifically of the procTime semantic operations. 
eventTime is another story anyway. The map state as minor advantages in the 
beginning (as anyway the serialization costs are small), the queue state as 
advantages in executions running steadily because of access pattern and natural 
buffer cleansing.  

These are my two cents on the discussion




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3574: [FLINK-5653] Add processing time OVER ROWS BETWEEN x PREC...

2017-03-21 Thread huawei-flink
Github user huawei-flink commented on the issue:

https://github.com/apache/flink/pull/3574
  
I have a first implementation of the processFunction, using a Queue as a 
state. However, I need to implement the retractableAggregation, as AVG for 
example is not supported.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3574: [FLINK-5653] Add processing time OVER ROWS BETWEEN x PREC...

2017-03-20 Thread huawei-flink
Github user huawei-flink commented on the issue:

https://github.com/apache/flink/pull/3574
  
@fhueske let's see if the third attempt works. I have included all the 
comments of @sunjincheng121 (apart from the window function) and having done a 
merge on a fresh checkout also the imports should be fine. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-20 Thread huawei-flink
Github user huawei-flink closed the pull request at:

https://github.com/apache/flink/pull/3547


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3574: Eager aggregation over row bounded window

2017-03-20 Thread huawei-flink
GitHub user huawei-flink opened a pull request:

https://github.com/apache/flink/pull/3574

Eager aggregation over row bounded window

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [X ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [X ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [X ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/huawei-flink/flink FLINK-5653c

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3574.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3574






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN x PREC...

2017-03-20 Thread huawei-flink
Github user huawei-flink commented on the issue:

https://github.com/apache/flink/pull/3547
  
sorry about the mess. I don't understand the mess the rebase does with 
eclipse... I will close this PR and open another one including all the changes 
and comments


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-20 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3547#discussion_r106917566
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -17,35 +17,62 @@
  */
 package org.apache.flink.table.runtime.aggregate
 
-import java.util
+import scala.collection.JavaConversions.asScalaBuffer
--- End diff --

I need that import, without it does not build.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-20 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3547#discussion_r106868683
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/ProcTimeRowBoundedAggregationTest.scala
 ---
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.{ TableEnvironment, TableException }
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.utils.{
+  StreamingWithStateTestBase,
+  StreamITCase,
+  StreamTestData
+}
+import org.junit.Assert._
+import org.junit._
+import scala.collection.mutable
+import org.apache.flink.types.Row
+
+class ProcTimeRowBoundedAggregationTest extends StreamingWithStateTestBase 
{
--- End diff --

@fhueske Should I add all the test unit under the SqlITCase class? 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-17 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3547#discussion_r106616178
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -106,9 +113,14 @@ class DataStreamOverAggregate(
 if (overWindow.lowerBound.isUnbounded &&
   overWindow.upperBound.isCurrentRow) {
   createUnboundedAndCurrentRowProcessingTimeOverWindow(inputDS)
+} // lowerbound is a BasicType and upperbound is PRECEEDING or 
CURRENT ROW
+else if 
(overWindow.lowerBound.getOffset.getType.isInstanceOf[BasicSqlType]
--- End diff --

I guess that is also a way to do it. The check allows to distinguish 
between time bounded and row bounded. I have no particular affection for my 
solution, it just worked. I will apply and test yours as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-17 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3547#discussion_r106615708
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -17,34 +17,41 @@
  */
 package org.apache.flink.table.plan.nodes.datastream
 
-import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet }
--- End diff --

@fhueske sorry about that. Will be more careful in the next one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-16 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3547#discussion_r106465657
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverWindowFunction.scala
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.types.Row
+import org.apache.flink.configuration.Configuration
+import 
org.apache.flink.streaming.api.functions.windowing.RichWindowFunction
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+import org.apache.flink.table.functions.AggregateFunction
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.util.Preconditions
+import org.apache.flink.table.functions.Accumulator
+import java.lang.Iterable
+
+class BoundedProcessingOverWindowFunction[W <: Window](
--- End diff --

I just thought about a case where one wants to do a COUNT DISTINCT type of 
aggregation. How does a processFunction work for that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...

2017-03-16 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106453475
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataStreamProcTimeAggregateGlobalWindowFunction.scala
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggregate
+
+import 
org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.functions.Accumulator
+
+import java.lang.Iterable
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+
+
+ //org.apache.flink.streaming.api.functions.windowing.AllWindowFunction
+
+class DataStreamProcTimeAggregateGlobalWindowFunction[W <: Window](
+ private val aggregates: Array[AggregateFunction[_]],
+ private val aggFields: Array[Int],
+ private val forwardedFieldCount: Int)
+ extends RichAllWindowFunction[Row, Row, W] {
+  
+private var output: Row = _
+private var accumulators: Row= _
+ 
+
+ override def open(parameters: Configuration): Unit = {
+ output = new Row(forwardedFieldCount + aggregates.length)
+ accumulators = new Row(aggregates.length)
+ var i = 0
+ while (i < aggregates.length) {
+accumulators.setField(i, aggregates(i).createAccumulator())
+i = i + 1
+ }
+  }
+   
+ override def apply(
+  window: W,
+  records: Iterable[Row],
+  out: Collector[Row]): Unit = {
+
+  
+ var i = 0
+ //initialize the values of the aggregators by re-creating them
+ //the design of the Accumulator interface should be extended to 
enable 
+ //a reset function for better performance
+ while (i < aggregates.length) {
+accumulators.setField(i, aggregates(i).createAccumulator())
--- End diff --

I will apply this fix also in my function


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-16 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3547#discussion_r106452550
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/ProcTimeRowStreamAggregationSqlITCase.java
 ---
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.java.stream.sql;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.api.java.stream.utils.StreamTestData;
+import org.apache.flink.table.api.scala.stream.utils.StreamITCase;
+import org.apache.flink.types.Row;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class ProcTimeRowStreamAggregationSqlITCase extends 
StreamingMultipleProgramsTestBase {
+
+   
+   @Test
+   public void testMaxAggregatation() throws Exception {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
+   StreamITCase.clear();
+
+   env.setParallelism(1);
+   
+   DataStream<Tuple5<Integer, Long, Integer, String, Long>> ds = 
StreamTestData.get5TupleDataStream(env);
+   Table in = tableEnv.fromDataStream(ds, "a,b,c,d,e");
+   tableEnv.registerTable("MyTable", in);
+
+   String sqlQuery = "SELECT a, MAX(c) OVER (PARTITION BY a ORDER 
BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS maxC FROM MyTable";
+   Table result = tableEnv.sql(sqlQuery);
+
+   DataStream resultSet = tableEnv.toDataStream(result, 
Row.class);
+   resultSet.addSink(new StreamITCase.StringSink());
+   env.execute();
+
+   List expected = new ArrayList<>();
+   expected.add("1,0");
+   expected.add("2,1");
+   expected.add("2,2");
+   expected.add("3,3");
+   expected.add("3,4");
+   expected.add("3,5");
+   expected.add("4,6");
+   expected.add("4,7");
+   expected.add("4,8");
+   expected.add("4,9");
+   expected.add("5,10");
+   expected.add("5,11");
+   expected.add("5,12");
+   expected.add("5,14");
+   expected.add("5,14");
+
+   StreamITCase.compareWithList(expected);
+   }
+   
+   @Test
+   public void testMinAggregatation() throws Exception {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
+   StreamITCase.clear();
+
+   env.setParallelism(1);
+   
+   DataStream<Tuple5<Integer, Long, Integer, String, Long>> ds = 
StreamTestData.get5TupleDataStream(env);
+   Table in = tableEnv.fromDataStream(ds, "a,b,c,d,e");
+   tableEnv.registerTable("MyTable", in);
+
+   String sqlQuery = "SELECT a, MIN(c) OVER (PARTITION BY a ORDER 
BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS maxC FROM MyTable";
+   Table result = tableEnv.sql(sqlQuery);
+
+

[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-16 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3547#discussion_r106369028
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/ProcTimeRowStreamAggregationSqlITCase.java
 ---
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.java.stream.sql;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.api.java.stream.utils.StreamTestData;
+import org.apache.flink.table.api.scala.stream.utils.StreamITCase;
+import org.apache.flink.types.Row;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class ProcTimeRowStreamAggregationSqlITCase extends 
StreamingMultipleProgramsTestBase {
+
+   
+   @Test
+   public void testMaxAggregatation() throws Exception {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
+   StreamITCase.clear();
+
+   env.setParallelism(1);
+   
+   DataStream<Tuple5<Integer, Long, Integer, String, Long>> ds = 
StreamTestData.get5TupleDataStream(env);
+   Table in = tableEnv.fromDataStream(ds, "a,b,c,d,e");
+   tableEnv.registerTable("MyTable", in);
+
+   String sqlQuery = "SELECT a, MAX(c) OVER (PARTITION BY a ORDER 
BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS maxC FROM MyTable";
+   Table result = tableEnv.sql(sqlQuery);
+
+   DataStream resultSet = tableEnv.toDataStream(result, 
Row.class);
+   resultSet.addSink(new StreamITCase.StringSink());
+   env.execute();
+
+   List expected = new ArrayList<>();
+   expected.add("1,0");
+   expected.add("2,1");
+   expected.add("2,2");
+   expected.add("3,3");
+   expected.add("3,4");
+   expected.add("3,5");
+   expected.add("4,6");
+   expected.add("4,7");
+   expected.add("4,8");
+   expected.add("4,9");
+   expected.add("5,10");
+   expected.add("5,11");
+   expected.add("5,12");
+   expected.add("5,14");
+   expected.add("5,14");
+
+   StreamITCase.compareWithList(expected);
+   }
+   
+   @Test
+   public void testMinAggregatation() throws Exception {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
+   StreamITCase.clear();
+
+   env.setParallelism(1);
+   
+   DataStream<Tuple5<Integer, Long, Integer, String, Long>> ds = 
StreamTestData.get5TupleDataStream(env);
+   Table in = tableEnv.fromDataStream(ds, "a,b,c,d,e");
+   tableEnv.registerTable("MyTable", in);
+
+   String sqlQuery = "SELECT a, MIN(c) OVER (PARTITION BY a ORDER 
BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS maxC FROM MyTable";
+   Table result = tableEnv.sql(sqlQuery);
+
+

[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-16 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3547#discussion_r106367506
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/ProcTimeRowStreamAggregationSqlITCase.java
 ---
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.java.stream.sql;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.api.java.stream.utils.StreamTestData;
+import org.apache.flink.table.api.scala.stream.utils.StreamITCase;
+import org.apache.flink.types.Row;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class ProcTimeRowStreamAggregationSqlITCase extends 
StreamingMultipleProgramsTestBase {
--- End diff --

Why should also the test be implemented in scala? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-16 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3547#discussion_r106367130
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -130,32 +142,76 @@ class DataStreamOverAggregate(
 val rowTypeInfo = 
FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
 
 val result: DataStream[Row] =
-// partitioned aggregation
-if (partitionKeys.nonEmpty) {
-  val processFunction = 
AggregateUtil.CreateUnboundedProcessingOverProcessFunction(
-namedAggregates,
-inputType)
+  // partitioned aggregation
+  if (partitionKeys.nonEmpty) {
+val processFunction = 
AggregateUtil.CreateUnboundedProcessingOverProcessFunction(
+  namedAggregates,
+  inputType)
 
-  inputDS
+inputDS
   .keyBy(partitionKeys: _*)
   .process(processFunction)
   .returns(rowTypeInfo)
   .name(aggOpName)
   .asInstanceOf[DataStream[Row]]
-}
-// non-partitioned aggregation
-else {
-  val processFunction = 
AggregateUtil.CreateUnboundedProcessingOverProcessFunction(
-namedAggregates,
-inputType,
-false)
-
-  inputDS
-
.process(processFunction).setParallelism(1).setMaxParallelism(1)
-.returns(rowTypeInfo)
-.name(aggOpName)
-.asInstanceOf[DataStream[Row]]
-}
+  } // non-partitioned aggregation
+  else {
+val processFunction = 
AggregateUtil.CreateUnboundedProcessingOverProcessFunction(
+  namedAggregates,
+  inputType,
+  false)
+
+inputDS
+  .process(processFunction).setParallelism(1).setMaxParallelism(1)
+  .returns(rowTypeInfo)
+  .name(aggOpName)
+  .asInstanceOf[DataStream[Row]]
+  }
+result
+  }
+
+  def createBoundedAndCurrentRowProcessingTimeOverWindow(
+inputDS: DataStream[Row]): DataStream[Row] = {
+
+val overWindow: Group = logicWindow.groups.get(0)
+val partitionKeys: Array[Int] = overWindow.keys.toArray
+val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = 
generateNamedAggregates
+
+// get the output types
+val rowTypeInfo = 
FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
+
+val lowerbound: Int = AggregateUtil.getLowerBoundary(
+  logicWindow.constants,
+  overWindow.lowerBound,
+  getInput())
+
+val result: DataStream[Row] =
+  // partitioned aggregation
+  if (partitionKeys.nonEmpty) {
+val windowFunction = 
AggregateUtil.CreateBoundedProcessingOverWindowFunction(
+  namedAggregates,
+  inputType)
+inputDS
+  .keyBy(partitionKeys: _*)
+  .countWindow(lowerbound,1)
+  .apply(windowFunction)
+  .returns(rowTypeInfo)
+  .name(aggOpName)
+  .asInstanceOf[DataStream[Row]]
+  } // global non-partitioned aggregation
+  else {
+val windowFunction = 
AggregateUtil.CreateBoundedProcessingOverGlobalWindowFunction(
+  namedAggregates,
+  inputType)
+
+inputDS
+  .countWindowAll(lowerbound,1)
--- End diff --

So, the semantic of between 2 rows and current row does not include the 
current row and I should count 3 elements?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-16 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3547#discussion_r106366744
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverWindowFunction.scala
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.types.Row
+import org.apache.flink.configuration.Configuration
+import 
org.apache.flink.streaming.api.functions.windowing.RichWindowFunction
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+import org.apache.flink.table.functions.AggregateFunction
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.util.Preconditions
+import org.apache.flink.table.functions.Accumulator
+import java.lang.Iterable
+
+class BoundedProcessingOverWindowFunction[W <: Window](
--- End diff --

Thanks a lot for the clarification. I am really willing to do it right, but 
at the same time I need to understand. So, please be patient. :-) 

When we started discussing the issue with @fhueske 
(https://issues.apache.org/jira/browse/FLINK-5654?filter=-1) there was a 
decision to use window, not process function. 
Code consistency is pretty much the same, just extening a different 
interface. I understand that ProcessFunction can manage its state, but window 
checkpointing should replay all events in case of failure, so we would have 
consistent processing even without managing this level of granularity in the 
state. With procTime semantic, we can neglect retraction, and window can anyway 
customize triggering function. 

I don't understand the third point. 

The main argument I see for this specific case is that ProcessFunction 
supports granular state management. Besides the alleged code consistency. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-16 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3547#discussion_r106361152
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -19,33 +19,124 @@ package org.apache.flink.table.runtime.aggregate
 
 import java.util
 
-import org.apache.calcite.rel.`type`._
+import scala.collection.JavaConversions.asScalaBuffer
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rex.RexInputRef
+import org.apache.calcite.rex.RexLiteral
+import org.apache.calcite.rex.RexWindowBound
+import org.apache.calcite.sql.SqlAggFunction
+import org.apache.calcite.sql.SqlKind
 import org.apache.calcite.sql.`type`.SqlTypeName
-import org.apache.calcite.sql.`type`.SqlTypeName._
-import org.apache.calcite.sql.fun._
-import org.apache.calcite.sql.{SqlAggFunction, SqlKind}
-import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.calcite.sql.`type`.SqlTypeName.BIGINT
+import org.apache.calcite.sql.`type`.SqlTypeName.BOOLEAN
+import org.apache.calcite.sql.`type`.SqlTypeName.DECIMAL
+import org.apache.calcite.sql.`type`.SqlTypeName.DOUBLE
+import org.apache.calcite.sql.`type`.SqlTypeName.FLOAT
+import org.apache.calcite.sql.`type`.SqlTypeName.INTEGER
+import org.apache.calcite.sql.`type`.SqlTypeName.SMALLINT
+import org.apache.calcite.sql.`type`.SqlTypeName.TINYINT
--- End diff --

I see your point, although I thought that wildcard import was not a best 
practice. It seems that the java and scala implementation are following 
different conventions. I have no problems with it in principle.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-16 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3547#discussion_r106360688
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala
 ---
@@ -18,8 +18,6 @@
 
 package org.apache.flink.table.plan.logical.rel
 
-import java.util
--- End diff --

Thanks, both solutions look better than the half package import. I will 
apply one of those. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-15 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3547#discussion_r106238200
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -19,33 +19,124 @@ package org.apache.flink.table.runtime.aggregate
 
 import java.util
 
-import org.apache.calcite.rel.`type`._
+import scala.collection.JavaConversions.asScalaBuffer
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rex.RexInputRef
+import org.apache.calcite.rex.RexLiteral
+import org.apache.calcite.rex.RexWindowBound
+import org.apache.calcite.sql.SqlAggFunction
+import org.apache.calcite.sql.SqlKind
 import org.apache.calcite.sql.`type`.SqlTypeName
-import org.apache.calcite.sql.`type`.SqlTypeName._
-import org.apache.calcite.sql.fun._
-import org.apache.calcite.sql.{SqlAggFunction, SqlKind}
-import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.calcite.sql.`type`.SqlTypeName.BIGINT
+import org.apache.calcite.sql.`type`.SqlTypeName.BOOLEAN
+import org.apache.calcite.sql.`type`.SqlTypeName.DECIMAL
+import org.apache.calcite.sql.`type`.SqlTypeName.DOUBLE
+import org.apache.calcite.sql.`type`.SqlTypeName.FLOAT
+import org.apache.calcite.sql.`type`.SqlTypeName.INTEGER
+import org.apache.calcite.sql.`type`.SqlTypeName.SMALLINT
+import org.apache.calcite.sql.`type`.SqlTypeName.TINYINT
--- End diff --

after an inspection, I realized that the imports you mentioned are used. I 
think there is no unused import at this moment. Am I missing something?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-15 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3547#discussion_r106222135
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala
 ---
@@ -18,8 +18,6 @@
 
 package org.apache.flink.table.plan.logical.rel
 
-import java.util
--- End diff --

In principle I agree, but I it caused some building problem. Is there a 
practical reason for not importing the List class directly rather than creating 
"half references"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-15 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3547#discussion_r106221708
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/rel/LogicalWindowAggregate.scala
 ---
@@ -37,8 +35,8 @@ class LogicalWindowAggregate(
 child: RelNode,
 indicator: Boolean,
 groupSet: ImmutableBitSet,
-groupSets: util.List[ImmutableBitSet],
-aggCalls: util.List[AggregateCall])
+groupSets: java.util.List[ImmutableBitSet],
--- End diff --

for some reason it does not build in eclipse in the original way, and 
honestly I struggle to understand "the half package name" usage in Scala. Is 
there any practical reason for that? and does it break some convention to use 
the complete package name? I am asking with honest curiosity, and no polemical 
intention. Thanks for clarifying.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-15 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3547#discussion_r106220189
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -159,6 +168,42 @@ class DataStreamOverAggregate(
 result
   }
 
+def createBoundedAndCurrentRowProcessingTimeOverWindow(
+inputDS: DataStream[Row]): DataStream[Row] = {
+
+val overWindow: Group = logicWindow.groups.get(0)
+val partitionKeys: Array[Int] = overWindow.keys.toArray
+val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = 
generateNamedAggregates
+
+// get the output types
+val rowTypeInfo = 
FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
+
+val result: DataStream[Row] =
+  // partitioned aggregation
+  if (partitionKeys.nonEmpty) {
+val windowFunction = 
AggregateUtil.CreateBoundedProcessingOverWindowFunction(
+  namedAggregates,
+  inputType)
+
+val lowerbound: Int = AggregateUtil.getLowerBoundary(
+  logicWindow.constants,
+  overWindow.lowerBound,
+  getInput())
+
+inputDS
+  .keyBy(partitionKeys: _*)
+  .countWindow(lowerbound, 1).apply(windowFunction)
+  .returns(rowTypeInfo)
+  .name(aggOpName)
+  .asInstanceOf[DataStream[Row]]
+  } // global non-partitioned aggregation
+  else {
+throw TableException(
--- End diff --

If needed, I can do it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-15 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3547#discussion_r106219814
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -19,33 +19,124 @@ package org.apache.flink.table.runtime.aggregate
 
 import java.util
 
-import org.apache.calcite.rel.`type`._
+import scala.collection.JavaConversions.asScalaBuffer
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rex.RexInputRef
+import org.apache.calcite.rex.RexLiteral
+import org.apache.calcite.rex.RexWindowBound
+import org.apache.calcite.sql.SqlAggFunction
+import org.apache.calcite.sql.SqlKind
 import org.apache.calcite.sql.`type`.SqlTypeName
-import org.apache.calcite.sql.`type`.SqlTypeName._
-import org.apache.calcite.sql.fun._
-import org.apache.calcite.sql.{SqlAggFunction, SqlKind}
-import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.calcite.sql.`type`.SqlTypeName.BIGINT
+import org.apache.calcite.sql.`type`.SqlTypeName.BOOLEAN
+import org.apache.calcite.sql.`type`.SqlTypeName.DECIMAL
+import org.apache.calcite.sql.`type`.SqlTypeName.DOUBLE
+import org.apache.calcite.sql.`type`.SqlTypeName.FLOAT
+import org.apache.calcite.sql.`type`.SqlTypeName.INTEGER
+import org.apache.calcite.sql.`type`.SqlTypeName.SMALLINT
+import org.apache.calcite.sql.`type`.SqlTypeName.TINYINT
--- End diff --

sure


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-15 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3547#discussion_r106219673
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverWindowFunction.scala
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.types.Row
+import org.apache.flink.configuration.Configuration
+import 
org.apache.flink.streaming.api.functions.windowing.RichWindowFunction
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+import org.apache.flink.table.functions.AggregateFunction
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.util.Preconditions
+import org.apache.flink.table.functions.Accumulator
+import java.lang.Iterable
+
+class BoundedProcessingOverWindowFunction[W <: Window](
--- End diff --

@sunjincheng121 thanks for the suggestion. I decided to use Window because 
it is convenient in the row bounded case. Within the window I apply the 
incremental aggregation in the same way.

It is not clear to me what are the flexibility advantages in this specific 
case. Can you be more explicit?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-15 Thread huawei-flink
GitHub user huawei-flink opened a pull request:

https://github.com/apache/flink/pull/3547

[FLINK-5653] Add processing time OVER ROWS BETWEEN x PRECEDING aggregation 
to SQL

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ x ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/huawei-flink/flink FLINK-5653b

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3547.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3547


commit 0e33399e145154b2dd68e0539b65a1baaba512bd
Author: Stefano Bortoli <s.bort...@gmail.com>
Date:   2017-03-15T12:25:54Z

First implementation of aggregation over procTime row bounded window

commit 77a7eff006a1a4aaec2f785994a716bcd9c84133
Author: Stefano Bortoli <s.bort...@gmail.com>
Date:   2017-03-15T12:25:54Z

First implementation of aggregation over procTime row bounded window

commit ecec5527b454ae9c1c3b037103b98660729d8958
Author: Stefano Bortoli <s.bort...@gmail.com>
Date:   2017-03-15T13:03:13Z

Merge branch 'master' of https://github.com/huawei-flink/flink into
FLINK-5653b

# Conflicts:
#   
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
#   
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-15 Thread huawei-flink
Github user huawei-flink closed the pull request at:

https://github.com/apache/flink/pull/3443


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN x PREC...

2017-03-15 Thread huawei-flink
Github user huawei-flink commented on the issue:

https://github.com/apache/flink/pull/3443
  
@fhueske, I am closing this PR and opening a new one in Scala.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3459: [FLINK-5654] Add processing time OVER RANGE BETWEEN x PRE...

2017-03-09 Thread huawei-flink
Github user huawei-flink commented on the issue:

https://github.com/apache/flink/pull/3459
  
@fhueske 
3. "•The first OVER window aggregation should serve as a blueprint for 
future OVER window implementations." - is this a general thought or you 
indicate that we need to rework the code based on some specific class?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN x PREC...

2017-03-09 Thread huawei-flink
Github user huawei-flink commented on the issue:

https://github.com/apache/flink/pull/3443
  
Hi @fhueske about the squashing, what is the best strategy? I was thinking 
to just get a clean branch and merge my contribution there and then push it in 
one commit. 

About the scala part, is it really necessary? I am in the process of 
"translating" some of the utils in Java to overcome the "multiple extension" 
limitation. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3459: [FLINK-5654] Add processing time OVER RANGE BETWEEN x PRE...

2017-03-09 Thread huawei-flink
Github user huawei-flink commented on the issue:

https://github.com/apache/flink/pull/3459
  
Hi @fhueske 

I will start with the minor comments: in principle those were done by 
mistake within the rebase...
Regarding the FunctionCatalog - that was because initially we used our own 
implementation fro ProcTime()the file as it is now it should not be 
modified. - now after the rebase the proctime is used

Related to the main comments:
1) scala/java - I think it is a bit restrictive and unfair to say that only 
scala is acceptable here for maintenance reasons  given that a large part of 
the Flink project is written in java... Will you actually impose these 
restriction?
2) related to the squash commits...we can check out again a new branch and 
add only the modifications in one push - I guess this should be ok?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-09 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3443#discussion_r105162342
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamCalcRule.scala
 ---
@@ -46,6 +52,20 @@ class DataStreamCalcRule
   calc.getProgram,
   description)
   }
+
+  override def matches(call: RelOptRuleCall): Boolean = {
--- End diff --

very nice. This was a work-around, better to manage it propertly


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-09 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3443#discussion_r105162261
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
 ---
@@ -165,6 +165,11 @@ object FlinkRuleSets {
 
   // merge and push unions rules
   UnionEliminatorRule.INSTANCE,
+  
+  // aggregations over intervals should be enabled to be translated 
also in
+  //queries with LogicalWindows, not only queries with LogicalCalc
+  ProjectWindowTransposeRule.INSTANCE,
+  ProjectToWindowRule.INSTANCE,
--- End diff --

ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3459: [FLINK-5654] Add processing time OVER RANGE BETWEEN x PRE...

2017-03-08 Thread huawei-flink
Github user huawei-flink commented on the issue:

https://github.com/apache/flink/pull/3459
  
@fhueske @wuchong @twalthr  can you please take a look to see if you can 
merge this. Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3459: [FLINK-5654] Add processing time OVER RANGE BETWEE...

2017-03-06 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3459#discussion_r104431103
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/DataStreamProcTimeTimeAggregate.java
 ---
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.table.api.StreamTableEnvironment;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.calcite.FlinkTypeFactory;
+import org.apache.flink.table.plan.logical.rel.util.WindowAggregateUtil;
+import org.apache.flink.table.plan.nodes.datastream.DataStreamRel;
+import 
org.apache.flink.table.plan.nodes.datastream.function.DataStreamProcTimeAggregateGlobalWindowFunction;
+import 
org.apache.flink.table.plan.nodes.datastream.function.DataStreamProcTimeAggregateWindowFunction;
+import org.apache.flink.types.Row;
+
+public class DataStreamProcTimeTimeAggregate extends DataStreamRelJava {
--- End diff --

what annotation? I am not familiar with the class annotation of Flink. is 
it Internal? Public? Thanks a lot for the clarification. Or were you referring 
to the documentation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3459: [FLINK-5654] Add processing time OVER RANGE BETWEE...

2017-03-06 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3459#discussion_r104430878
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/DataStreamProcTimeTimeAggregate.java
 ---
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.table.api.StreamTableEnvironment;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.calcite.FlinkTypeFactory;
+import org.apache.flink.table.plan.logical.rel.util.WindowAggregateUtil;
+import org.apache.flink.table.plan.nodes.datastream.DataStreamRel;
+import 
org.apache.flink.table.plan.nodes.datastream.function.DataStreamProcTimeAggregateGlobalWindowFunction;
+import 
org.apache.flink.table.plan.nodes.datastream.function.DataStreamProcTimeAggregateWindowFunction;
+import org.apache.flink.types.Row;
+
+public class DataStreamProcTimeTimeAggregate extends DataStreamRelJava {
--- End diff --

I added a similar description with the other SQL related classes


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3459: [FLINK-5654] Add processing time OVER RANGE BETWEEN x PRE...

2017-03-06 Thread huawei-flink
Github user huawei-flink commented on the issue:

https://github.com/apache/flink/pull/3459
  
Related to the building failure - I see that this fails only for one 
particular case. I looked into the error and it is not related to my 
modifications as you cans see below. In fact I did not touch on the Cassandra 
connector which is the one failing nor I caused, I would say, any things to 
conflict with it. 
From my point of view this could be pulled in 

[INFO] flink-libraries  SUCCESS [  
0.271 s]
[INFO] flink-table  SUCCESS [02:44 
min]
[INFO] flink-jdbc . SUCCESS [  
0.898 s]
[INFO] flink-hbase  SUCCESS [ 
48.336 s]
[INFO] flink-hcatalog . SUCCESS [  
8.864 s]
[INFO] flink-metrics-jmx .. SUCCESS [  
0.487 s]
[INFO] flink-connector-kafka-base . SUCCESS [  
4.050 s]
[INFO] flink-connector-kafka-0.8 .. SUCCESS [  
3.325 s]
[INFO] flink-connector-kafka-0.9 .. SUCCESS [  
3.302 s]
[INFO] flink-connector-kafka-0.10 . SUCCESS [  
1.495 s]
[INFO] flink-connector-elasticsearch-base . SUCCESS [  
5.535 s]
[INFO] flink-connector-elasticsearch .. SUCCESS [01:07 
min]
[INFO] flink-connector-elasticsearch2 . SUCCESS [ 
14.613 s]
[INFO] flink-connector-rabbitmq ... SUCCESS [  
0.493 s]
[INFO] flink-connector-twitter  SUCCESS [  
2.241 s]
[INFO] flink-connector-nifi ... SUCCESS [  
0.816 s]
[INFO] flink-connector-cassandra .. FAILURE [02:15 
min]
[INFO] flink-connector-filesystem . SKIPPED
[INFO] flink-connector-kinesis  SKIPPED
[INFO] flink-connector-elasticsearch5 . SKIPPED
[INFO] flink-examples-streaming ... SKIPPED
[INFO] flink-gelly  SKIPPED
[INFO] flink-gelly-scala .. SKIPPED
[INFO] flink-gelly-examples ... SKIPPED
[INFO] flink-python ... SKIPPED
[INFO] flink-ml ... SKIPPED
[INFO] flink-cep .. SKIPPED
[INFO] flink-cep-scala  SKIPPED
[INFO] flink-scala-shell .. SKIPPED
[INFO] flink-quickstart ... SKIPPED
[INFO] flink-quickstart-java .. SKIPPED
[INFO] flink-quickstart-scala . SKIPPED
[INFO] flink-storm  SKIPPED
[INFO] flink-storm-examples ... SKIPPED
[INFO] flink-streaming-contrib  SKIPPED
[INFO] flink-tweet-inputformat  SKIPPED
[INFO] flink-connector-wikiedits .. SKIPPED
[INFO] flink-mesos  SKIPPED
[INFO] flink-yarn . SKIPPED
[INFO] flink-metrics-dropwizard ... SKIPPED
[INFO] flink-metrics-ganglia .. SKIPPED
[INFO] flink-metrics-graphite . SKIPPED
[INFO] flink-metrics-statsd ... SKIPPED
[INFO] flink-dist . SKIPPED
[INFO] flink-fs-tests . SKIPPED
[INFO] flink-yarn-tests ... SKIPPED
[INFO] 

[INFO] BUILD FAILURE
[INFO] 

[INFO] Total time: 25:31 min
[INFO] Finished at: 2017-03-06T12:07:47+00:00
[INFO] Final Memory: 161M/493M
[INFO] 

[ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-surefire-plugin:2.18.1:test (integration-tests) 
on project flink-connector-cassandra_2.10: There are test failures.
[ERROR] 
[ERROR] Please refer to 
/home/travis/build/apache/flink/flink-connectors/flink-connector-cassandra/target/surefire-reports
 for the individual test results.
[ERROR] -> [Help 1]
[ERROR] 
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e 
switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR] 
[ERROR] For m

[GitHub] flink pull request #3459: [FLINK-5654] Add processing time OVER RANGE BETWEE...

2017-03-06 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3459#discussion_r104429058
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/aggs/StreamAggregator.java
 ---
@@ -0,0 +1,23 @@
+package org.apache.flink.table.plan.nodes.datastream.aggs;
--- End diff --

I added this before...indeed it was failing the RAT 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3459: [FLINK-5654] Add processing time OVER RANGE BETWEE...

2017-03-06 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3459#discussion_r104428922
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/DataStreamProcTimeTimeAggregate.java
 ---
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.table.api.StreamTableEnvironment;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.calcite.FlinkTypeFactory;
+import org.apache.flink.table.plan.logical.rel.util.WindowAggregateUtil;
+import org.apache.flink.table.plan.nodes.datastream.DataStreamRel;
+import 
org.apache.flink.table.plan.nodes.datastream.function.DataStreamProcTimeAggregateGlobalWindowFunction;
+import 
org.apache.flink.table.plan.nodes.datastream.function.DataStreamProcTimeAggregateWindowFunction;
+import org.apache.flink.types.Row;
+
+public class DataStreamProcTimeTimeAggregate extends DataStreamRelJava {
+
+   private LogicalWindow windowReference;
+   private String description;
+
+   public DataStreamProcTimeTimeAggregate(RelOptCluster cluster, 
RelTraitSet traitSet, RelNode input,
+   RelDataType rowType, String description, LogicalWindow 
windowReference) {
+   super(cluster, traitSet, input);
+
+   this.rowType = rowType;
+   this.description = description;
+   this.windowReference = windowReference;
+
+   }
+
+   @Override
+   protected RelDataType deriveRowType() {
+   // TODO Auto-generated method stub
+   return super.deriveRowType();
+   }
+
+   @Override
+   public RelNode copy(RelTraitSet traitSet, java.util.List 
inputs) {
+
+   if (inputs.size() != 1) {
+   System.err.println(this.getClass().getName() + " : 
Input size must be one!");
+   }
+
+   return new DataStreamProcTimeTimeAggregate(getCluster(), 
traitSet, inputs.get(0), getRowType(),
+   getDescription(), windowReference);
+
+   }
+
+   @Override
+   public DataStream translateToPlan(StreamTableEnvironment tableEnv) 
{
+
+   // Get the general parameters related to the datastream, 
inputs, result
+   TableConfig config = tableEnv.getConfig();
+
+   DataStream inputDataStream = ((DataStreamRel) 
getInput()).translateToPlan(tableEnv);
+
+   TypeInformation[] rowType = new 
TypeInformation[getRowType().getFieldList().size()];
--- End diff --

Not sure how you suggest to make the checks? Is there a style in flink for 
doing this?

Also related to the suggestion of using a variable for 
getRowType().getFieldList() - I will do it, but in theory modern day compilers 
such as JAVA should identify and optimize this kind of things. And therefore I 
would expect the resu

[GitHub] flink pull request #3459: [FLINK-5654] Add processing time OVER RANGE BETWEE...

2017-03-06 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3459#discussion_r104366556
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/DataStreamProcTimeTimeAggregate.java
 ---
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.table.api.StreamTableEnvironment;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.calcite.FlinkTypeFactory;
+import org.apache.flink.table.plan.logical.rel.util.WindowAggregateUtil;
+import org.apache.flink.table.plan.nodes.datastream.DataStreamRel;
+import 
org.apache.flink.table.plan.nodes.datastream.function.DataStreamProcTimeAggregateGlobalWindowFunction;
+import 
org.apache.flink.table.plan.nodes.datastream.function.DataStreamProcTimeAggregateWindowFunction;
+import org.apache.flink.types.Row;
+
+public class DataStreamProcTimeTimeAggregate extends DataStreamRelJava {
+
+   private LogicalWindow windowReference;
+   private String description;
--- End diff --

@shijinkui - thanks for the comment
 I kept description because all other sql operator implementation have it. 
It was more for uniformity. Additionally, if at some point we decide to go for 
compiling functions instead of working directly against the datastream API, 
than this is typically used for naming. Because of these I propose to keep it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3459: [FLINK-5654] Add processing time OVER RANGE BETWEEN x PRE...

2017-03-06 Thread huawei-flink
Github user huawei-flink commented on the issue:

https://github.com/apache/flink/pull/3459
  
@shijinkui I think Radu and I complicated a bit the code pulling from each 
other branch some stuff. Next time we'll squash the commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3459: [FLINK-5654] Add processing time OVER RANGE BETWEE...

2017-03-06 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3459#discussion_r104362378
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/DataStreamProcTimeTimeAggregate.java
 ---
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.nodes.datastream;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
+import org.apache.flink.table.api.StreamTableEnvironment;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.calcite.FlinkTypeFactory;
+import org.apache.flink.table.plan.logical.rel.util.WindowAggregateUtil;
+import org.apache.flink.table.plan.nodes.datastream.DataStreamRel;
+import 
org.apache.flink.table.plan.nodes.datastream.function.DataStreamProcTimeAggregateGlobalWindowFunction;
+import 
org.apache.flink.table.plan.nodes.datastream.function.DataStreamProcTimeAggregateWindowFunction;
+import org.apache.flink.types.Row;
+
+public class DataStreamProcTimeTimeAggregate extends DataStreamRelJava {
+
+   private LogicalWindow windowReference;
+   private String description;
--- End diff --

@rtudoran you should have a look at this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-06 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3443#discussion_r104361048
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/ProcTimeRowStreamAggregationSqlITCase.java
 ---
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.java.stream.sql;
+
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.types.Row;
+import org.apache.flink.table.api.scala.stream.utils.StreamITCase;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.table.api.java.stream.utils.StreamTestData;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ProcTimeRowStreamAggregationSqlITCase extends 
StreamingMultipleProgramsTestBase {
+
+   
+   @Test
+   public void testMaxAggregatation() throws Exception {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
+   StreamITCase.clear();
+
+   env.setParallelism(1);
+   
+   DataStream<Tuple5<Integer, Long, Integer, String, Long>> ds = 
StreamTestData.get5TupleDataStream(env);
+   Table in = tableEnv.fromDataStream(ds, "a,b,c,d,e");
+   tableEnv.registerTable("MyTable", in);
+
+   String sqlQuery = "SELECT a, MAX(c) OVER (PARTITION BY a ORDER 
BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS maxC FROM MyTable";
+   Table result = tableEnv.sql(sqlQuery);
+
+   DataStream resultSet = tableEnv.toDataStream(result, 
Row.class);
+   resultSet.addSink(new StreamITCase.StringSink());
+   env.execute();
+
+   List expected = new ArrayList<>();
+   expected.add("1,0");
+   expected.add("2,1");
+   expected.add("2,2");
+   expected.add("3,3");
+   expected.add("3,4");
+   expected.add("3,5");
+   expected.add("4,6");
+   expected.add("4,7");
+   expected.add("4,8");
+   expected.add("4,9");
+   expected.add("5,10");
+   expected.add("5,11");
+   expected.add("5,12");
+   expected.add("5,14");
+   expected.add("5,14");
+
+   StreamITCase.compareWithList(expected);
+   }
+   
+   @Test
+   public void testMinAggregatation() throws Exception {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
+   StreamITCase.clear();
+
+   env.setParallelism(1);
+   
+   DataStream<Tuple5<Integer, Long, Integer, String, Long>> ds = 
StreamTestData.get5TupleDataStream(env);
+   Table in = tableEnv.fromDataStream(ds, "a,b,c,d,e");
+   tableEnv.registerTable("MyTable", in);
+
+   String sqlQuery = "SELECT a, MIN(c) OVER (PARTITION BY a ORDER 
BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS maxC FROM MyTable";
+   Table result = tableEnv.sql(sqlQuery);
+
+

[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-03 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3443#discussion_r104148648
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/aggs/DoubleSummaryAggregation.java
 ---
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes.datastream.aggs;
+
+import static 
org.apache.flink.api.java.summarize.aggregation.CompensatedSum.ZERO;
+
+import org.apache.flink.api.java.summarize.aggregation.Aggregator;
+import org.apache.flink.api.java.summarize.aggregation.CompensatedSum;
+import 
org.apache.flink.api.java.summarize.aggregation.NumericSummaryAggregator;
+
+public class DoubleSummaryAggregation extends 
NumericSummaryAggregator {
--- End diff --

That is fine. I have pulled the new aggregation functions, and updated the 
PR without the reset method. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3459: [FLINK-5654] Add processing time OVER RANGE BETWEE...

2017-03-02 Thread huawei-flink
GitHub user huawei-flink opened a pull request:

https://github.com/apache/flink/pull/3459

[FLINK-5654] Add processing time OVER RANGE BETWEEN x PRECEDING aggregation 
to SQL

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ X] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [X ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/huawei-flink/flink FLINK-5654

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3459.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3459


commit 72ec35a7380a4d73bd092ce14962ab2248139bae
Author: Stefano Bortoli <s.bort...@gmail.com>
Date:   2017-02-01T16:15:58Z

First implementation of ProcTime()

commit e98c28616af1cf67d3ad3277d9cc2ca335604eca
Author: rtudoran <rtudoran@bigdata-hp3>
Date:   2017-02-02T10:30:40Z

Disambiguate for the OVER BY clause, which should not be treated as a
RexOver expression in Logical Project

commit b7e6a673b88b6181c06071cd6c7bda55c25a62b4
Author: Stefano Bortoli <s.bort...@gmail.com>
Date:   2017-02-02T12:07:11Z

Added return to disambiguation method for rexover

commit cda17565d5969f29b16923b631178a2cbf64791b
Author: rtudoran <rtudoran@bigdata-hp3>
Date:   2017-02-02T16:00:20Z

Enable the LogicalWindow operators in query translation

commit 4b3e54281018b83c818f91e09a5321c34bbf297b
Author: rtudoran <rtudoran@bigdata-hp3>
Date:   2017-02-03T14:59:39Z

Added a DataStreamRel version that can be extended in java

commit cc960d699db369cc8dc4e155cc5c5f6c3baf74a4
Author: rtudoran <rtudoran@bigdata-hp3>
Date:   2017-02-03T15:35:18Z

Add skeleton for the implementation of the aggregates over sliding
window with processing time and time boundaries

commit 2390a9d3dc15afba01185c47f61a9ea830ea5acc
Author: Stefano Bortoli <s.bort...@gmail.com>
Date:   2017-02-06T10:33:57Z

committing changes with stub modifications before chekout proctime
branch

commit eaf4e92784dab01b17004390968ca4b1fe7c4bea
Author: Stefano Bortoli <s.bort...@gmail.com>
Date:   2017-02-06T13:17:43Z

ignore aggregation test and implemented simple proctime test

commit 16ccd7f5bf019803ea8b53f09a126ec53a5a6d59
Author: Stefano Bortoli <s.bort...@gmail.com>
Date:   2017-02-06T14:17:03Z

Merge branch 'FLINK-5710' of https://github.com/huawei-flink/flink into 
FLINK-5653

commit 10f7bc5e2086e41ec76cbabdcd069c71a491671a
Author: Stefano Bortoli <s.bort...@gmail.com>
Date:   2017-02-07T09:42:41Z

committing first key selector and utils

commit 31060e46f78729880c03e8cab0f92ff06faec4f0
Author: Stefano Bortoli <s.bort...@gmail.com>
Date:   2017-02-07T11:16:43Z

Changed ProcTime from time to timestamp

commit 69289bad836a5fdace271b28a15ca0e309e50b17
Author: rtudoran <tudoranr...@ymail.com>
Date:   2017-02-07T13:13:23Z

Merge branch 'FLINK-5710' of https://github.com/huawei-flink/flink into 
FLINK-5654

commit 3392817045ed166df5f55d22fde34cbd98c775db
Author: rtudoran <tudoranr...@ymail.com>
Date:   2017-02-07T13:14:50Z

Merge branch 'FLINK-5653' of https://github.com/huawei-flink/flink into 
FLINK-5654

commit d2ea0076b5e3561585c4eaea84025e50beaacf9a
Author: Stefano Bortoli <s.bort...@gmail.com>
Date:   2017-02-07T09:42:41Z

fixing linelength and other issues

commit f29f564bb7fe7496b9f3d2f45a6b4469af559378
Author: Stefano Bortoli <s.bort...@gmail.com>
Date:   2017-02-07T13:46:30Z

Merge branch 'FLINK-5653' of https://github.com/huawei-flink/flink.git
into FLINK-5653

Conflicts:

flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/logical/rel/DataStreamWindowRowAggregate.java

flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/logical/rel/util

[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-02 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3443#discussion_r103925068
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/aggs/DoubleSummaryAggregation.java
 ---
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes.datastream.aggs;
+
+import static 
org.apache.flink.api.java.summarize.aggregation.CompensatedSum.ZERO;
+
+import org.apache.flink.api.java.summarize.aggregation.Aggregator;
+import org.apache.flink.api.java.summarize.aggregation.CompensatedSum;
+import 
org.apache.flink.api.java.summarize.aggregation.NumericSummaryAggregator;
+
+public class DoubleSummaryAggregation extends 
NumericSummaryAggregator {
--- End diff --

Indeed, the StreamAggregator interface I defined has a reset and evict 
method, to support associative function aggregation. Anyway, no point in 
discussing it. I will push my proposal and then you see :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-02 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3443#discussion_r103920777
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/aggs/DoubleSummaryAggregation.java
 ---
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes.datastream.aggs;
+
+import static 
org.apache.flink.api.java.summarize.aggregation.CompensatedSum.ZERO;
+
+import org.apache.flink.api.java.summarize.aggregation.Aggregator;
+import org.apache.flink.api.java.summarize.aggregation.CompensatedSum;
+import 
org.apache.flink.api.java.summarize.aggregation.NumericSummaryAggregator;
+
+public class DoubleSummaryAggregation extends 
NumericSummaryAggregator {
--- End diff --

I see that the AggregateFunction interface does not have a reset method if 
not the createAccumulator(), this means that we have to create an accumulator 
for every apply execution. With the reset you just assign the value to the 
starting point, without creating a new object all time. What do you think?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-02 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3443#discussion_r103919690
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/aggs/DoubleSummaryAggregation.java
 ---
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes.datastream.aggs;
+
+import static 
org.apache.flink.api.java.summarize.aggregation.CompensatedSum.ZERO;
+
+import org.apache.flink.api.java.summarize.aggregation.Aggregator;
+import org.apache.flink.api.java.summarize.aggregation.CompensatedSum;
+import 
org.apache.flink.api.java.summarize.aggregation.NumericSummaryAggregator;
+
+public class DoubleSummaryAggregation extends 
NumericSummaryAggregator {
--- End diff --

ok, so I will change my code to use the AggregateFunction you pointed out


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-02 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3443#discussion_r103914685
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/aggs/DoubleSummaryAggregation.java
 ---
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes.datastream.aggs;
+
+import static 
org.apache.flink.api.java.summarize.aggregation.CompensatedSum.ZERO;
+
+import org.apache.flink.api.java.summarize.aggregation.Aggregator;
+import org.apache.flink.api.java.summarize.aggregation.CompensatedSum;
+import 
org.apache.flink.api.java.summarize.aggregation.NumericSummaryAggregator;
+
+public class DoubleSummaryAggregation extends 
NumericSummaryAggregator {
--- End diff --

Do you think this switch on aggregation could be done in another PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-02 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3443#discussion_r103887096
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/aggs/DoubleSummaryAggregation.java
 ---
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes.datastream.aggs;
+
+import static 
org.apache.flink.api.java.summarize.aggregation.CompensatedSum.ZERO;
+
+import org.apache.flink.api.java.summarize.aggregation.Aggregator;
+import org.apache.flink.api.java.summarize.aggregation.CompensatedSum;
+import 
org.apache.flink.api.java.summarize.aggregation.NumericSummaryAggregator;
+
+public class DoubleSummaryAggregation extends 
NumericSummaryAggregator {
--- End diff --

The aggregate functions in the org.apache.flink.table.runtime.aggregate.* 
assume a GroupReduce, whereas I have implemented it as a flatmap. Should I 
switch my implementation to reduce? @fhueske what do you think? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-01 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3443#discussion_r103866134
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/nodes/datastream/aggs/DoubleSummaryAggregation.java
 ---
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.nodes.datastream.aggs;
+
+import static 
org.apache.flink.api.java.summarize.aggregation.CompensatedSum.ZERO;
+
+import org.apache.flink.api.java.summarize.aggregation.Aggregator;
+import org.apache.flink.api.java.summarize.aggregation.CompensatedSum;
+import 
org.apache.flink.api.java.summarize.aggregation.NumericSummaryAggregator;
+
+public class DoubleSummaryAggregation extends 
NumericSummaryAggregator {
--- End diff --

Good point. I searched a while, but could not find something fit, then I 
decided to create stream specific. The idea was to have something that could be 
stream optimized eventually. However, I will try to reuse existing one. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-01 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3443#discussion_r103866164
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/ProcTimeRowStreamAggregationSqlITCase.java
 ---
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.java.stream.sql;
+
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.types.Row;
+import org.apache.flink.table.api.scala.stream.utils.StreamITCase;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.table.api.java.stream.utils.StreamTestData;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ProcTimeRowStreamAggregationSqlITCase extends 
StreamingMultipleProgramsTestBase {
+
+   
+   @Test
+   public void testMaxAggregatation() throws Exception {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
+   StreamITCase.clear();
+
+   env.setParallelism(1);
+   
+   DataStream<Tuple5<Integer, Long, Integer, String, Long>> ds = 
StreamTestData.get5TupleDataStream(env);
+   Table in = tableEnv.fromDataStream(ds, "a,b,c,d,e");
+   tableEnv.registerTable("MyTable", in);
+
+   String sqlQuery = "SELECT a, MAX(c) OVER (PARTITION BY a ORDER 
BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS maxC FROM MyTable";
+   Table result = tableEnv.sql(sqlQuery);
+
+   DataStream resultSet = tableEnv.toDataStream(result, 
Row.class);
+   resultSet.addSink(new StreamITCase.StringSink());
+   env.execute();
+
+   List expected = new ArrayList<>();
+   expected.add("1,0");
+   expected.add("2,1");
+   expected.add("2,2");
+   expected.add("3,3");
+   expected.add("3,4");
+   expected.add("3,5");
+   expected.add("4,6");
+   expected.add("4,7");
+   expected.add("4,8");
+   expected.add("4,9");
+   expected.add("5,10");
+   expected.add("5,11");
+   expected.add("5,12");
+   expected.add("5,14");
+   expected.add("5,14");
+
+   StreamITCase.compareWithList(expected);
+   }
+   
+   @Test
+   public void testMinAggregatation() throws Exception {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);
+   StreamITCase.clear();
+
+   env.setParallelism(1);
+   
+   DataStream<Tuple5<Integer, Long, Integer, String, Long>> ds = 
StreamTestData.get5TupleDataStream(env);
+   Table in = tableEnv.fromDataStream(ds, "a,b,c,d,e");
+   tableEnv.registerTable("MyTable", in);
+
+   String sqlQuery = "SELECT a, MIN(c) OVER (PARTITION BY a ORDER 
BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS maxC FROM MyTable";
+   Table result = tableEnv.sql(sqlQuery);
+
+

[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-01 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3443#discussion_r103865652
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/logical/rel/util/WindowAggregateUtil.java
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.logical.rel.util;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.calcite.rel.core.Window.Group;
+import org.apache.calcite.rel.logical.LogicalWindow;
+import org.apache.calcite.rex.RexLiteral;
+
+import com.google.common.collect.ImmutableList;
+
+public class WindowAggregateUtil implements Serializable {
+
+   private static final long serialVersionUID = -3916551736243544540L;
+
+   private LogicalWindow windowPointer = null;
+
+   public WindowAggregateUtil() {
+
+   }
+
+   public WindowAggregateUtil(LogicalWindow window) {
+   this.windowPointer = window;
+
+   }
+
+   /**
+* A utility function that checks whether a window is partitioned or it 
is a
+* global window.
+* 
+* @param LogicalWindow
+*window to be checked for partitions
+* @return true if partition keys are defined, false otherwise.
+*/
+   public boolean isStreamPartitioned(LogicalWindow window) {
+   // if it exists a group bounded by keys, the it is
+   // a partitioned window
+   for (Group group : window.groups) {
+   if (!group.keys.isEmpty()) {
+   return true;
+   }
+   }
+
+   return false;
+   }
+
+   public int[] getKeysAsArray(Group group) {
+   if (group == null) {
+   return null;
+   }
+   return group.keys.toArray();
+   }
+
+   /**
+* This method returns the [[int]] lowerbound of a window when expressed
+* with an integer e.g. ... ROWS BETWEEN [[value]] PRECEDING AND 
CURRENT ROW
+* 
+* @param constants
+*the list of constant to get the offset value
+* @return return the value of the lowerbound if available -1 otherwise
+*/
+
+   public int getLowerBoundary(ImmutableList constants) {
+   return ((Long)constants.get(0).getValue2()).intValue();
--- End diff --

Thank you very much. I was indeed puzzled by this. I will fix it according 
to your suggestion. I even tried to get to the Calcite mailing list, but 
nothing like this came out. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3443: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-01 Thread huawei-flink
GitHub user huawei-flink opened a pull request:

https://github.com/apache/flink/pull/3443

[FLINK-5653] Add processing time OVER ROWS BETWEEN x PRECEDING aggregation 
to SQL

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [X] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [X] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [X] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/huawei-flink/flink FLINK-5653

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3443.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3443


commit 72ec35a7380a4d73bd092ce14962ab2248139bae
Author: Stefano Bortoli <s.bort...@gmail.com>
Date:   2017-02-01T16:15:58Z

First implementation of ProcTime()

commit e98c28616af1cf67d3ad3277d9cc2ca335604eca
Author: rtudoran <rtudoran@bigdata-hp3>
Date:   2017-02-02T10:30:40Z

Disambiguate for the OVER BY clause, which should not be treated as a
RexOver expression in Logical Project

commit b7e6a673b88b6181c06071cd6c7bda55c25a62b4
Author: Stefano Bortoli <s.bort...@gmail.com>
Date:   2017-02-02T12:07:11Z

Added return to disambiguation method for rexover

commit cda17565d5969f29b16923b631178a2cbf64791b
Author: rtudoran <rtudoran@bigdata-hp3>
Date:   2017-02-02T16:00:20Z

Enable the LogicalWindow operators in query translation

commit 4b3e54281018b83c818f91e09a5321c34bbf297b
Author: rtudoran <rtudoran@bigdata-hp3>
Date:   2017-02-03T14:59:39Z

Added a DataStreamRel version that can be extended in java

commit cc960d699db369cc8dc4e155cc5c5f6c3baf74a4
Author: rtudoran <rtudoran@bigdata-hp3>
Date:   2017-02-03T15:35:18Z

Add skeleton for the implementation of the aggregates over sliding
window with processing time and time boundaries

commit 2390a9d3dc15afba01185c47f61a9ea830ea5acc
Author: Stefano Bortoli <s.bort...@gmail.com>
Date:   2017-02-06T10:33:57Z

committing changes with stub modifications before chekout proctime
branch

commit eaf4e92784dab01b17004390968ca4b1fe7c4bea
Author: Stefano Bortoli <s.bort...@gmail.com>
Date:   2017-02-06T13:17:43Z

ignore aggregation test and implemented simple proctime test

commit 16ccd7f5bf019803ea8b53f09a126ec53a5a6d59
Author: Stefano Bortoli <s.bort...@gmail.com>
Date:   2017-02-06T14:17:03Z

Merge branch 'FLINK-5710' of https://github.com/huawei-flink/flink into 
FLINK-5653

commit 10f7bc5e2086e41ec76cbabdcd069c71a491671a
Author: Stefano Bortoli <s.bort...@gmail.com>
Date:   2017-02-07T09:42:41Z

committing first key selector and utils

commit 31060e46f78729880c03e8cab0f92ff06faec4f0
Author: Stefano Bortoli <s.bort...@gmail.com>
Date:   2017-02-07T11:16:43Z

Changed ProcTime from time to timestamp

commit 69289bad836a5fdace271b28a15ca0e309e50b17
Author: rtudoran <tudoranr...@ymail.com>
Date:   2017-02-07T13:13:23Z

Merge branch 'FLINK-5710' of https://github.com/huawei-flink/flink into 
FLINK-5654

commit 3392817045ed166df5f55d22fde34cbd98c775db
Author: rtudoran <tudoranr...@ymail.com>
Date:   2017-02-07T13:14:50Z

Merge branch 'FLINK-5653' of https://github.com/huawei-flink/flink into 
FLINK-5654

commit d2ea0076b5e3561585c4eaea84025e50beaacf9a
Author: Stefano Bortoli <s.bort...@gmail.com>
Date:   2017-02-07T09:42:41Z

fixing linelength and other issues

commit f29f564bb7fe7496b9f3d2f45a6b4469af559378
Author: Stefano Bortoli <s.bort...@gmail.com>
Date:   2017-02-07T13:46:30Z

Merge branch 'FLINK-5653' of https://github.com/huawei-flink/flink.git
into FLINK-5653

Conflicts:

flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/logical/rel/DataStreamWindowRowAggregate.java

flink-libraries/flink-table/src/main/java/org/apache/flink/table/plan/logical/rel/util

[GitHub] flink issue #3302: [FLINK-5710] Add ProcTime() function to indicate StreamSQ...

2017-02-27 Thread huawei-flink
Github user huawei-flink commented on the issue:

https://github.com/apache/flink/pull/3302
  
@fhueske no problem, I understand. It was bad timing, as I was on the run 
for more than two weeks, with little time to follow this. We'll  contribute on 
other issues.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3370: [FLINK-5710] Add ProcTime() function to indicate StreamSQ...

2017-02-21 Thread huawei-flink
Github user huawei-flink commented on the issue:

https://github.com/apache/flink/pull/3370
  
@haohui thanks for the contribution. I merged your code, I will push it 
later today. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3302: [FLINK-5710] Add ProcTime() function to indicate StreamSQ...

2017-02-21 Thread huawei-flink
Github user huawei-flink commented on the issue:

https://github.com/apache/flink/pull/3302
  
I managed to merge the changes from PR #3370 into my branch after rebase, 
and the test works. I will push the code later today. sorry if I am a little 
latent, but I am travelling and connectivity is sometimes a problem.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3302: [FLINK-5710] Add ProcTime() function to indicate StreamSQ...

2017-02-21 Thread huawei-flink
Github user huawei-flink commented on the issue:

https://github.com/apache/flink/pull/3302
  
Hi Fabian,

I will follow up in the next days, probably early next week. is it ok?

Stefano


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3302: [FLINK-5710] Add ProcTime() function to indicate StreamSQ...

2017-02-16 Thread huawei-flink
Github user huawei-flink commented on the issue:

https://github.com/apache/flink/pull/3302
  
@fhueske I've addressed most of the points, however there is a thing that 
is not clear to me yet. So far, the procTime() function generates a timestamp. 
My understanding is that this is not correct, and it should be something else. 
could it be a default timestamp (e.g. epoch)? the actual timestamp normalized 
to the second? what is the best option in your opinion?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3271: [FLINK-5710] Add ProcTime() function to indicate S...

2017-02-15 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3271#discussion_r101455204
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/calcite/functions/FlinkStreamFunctionCatalog.java
 ---
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.calcite.functions;
+
+import org.apache.calcite.sql.SqlFunction;
+import org.apache.calcite.sql.SqlFunctionCategory;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.type.OperandTypes;
+import org.apache.calcite.sql.type.ReturnTypes;
+import org.apache.calcite.sql.type.SqlReturnTypeInference;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+public class FlinkStreamFunctionCatalog {
+
+   /**
+* An explicit representation of TIMESTAMP as an SQL return type
+*/
+   private static final SqlReturnTypeInference TIMESTAMP = 
ReturnTypes.explicit(SqlTypeName.TIMESTAMP, 0);
+   
+   /**
+* A a parameterless scalar function that just indicates processing 
time mode.
+*/
+   public static final SqlFunction PROCTIME = new SqlFunction("PROCTIME", 
SqlKind.OTHER_FUNCTION, TIMESTAMP, null, OperandTypes.NILADIC, 
SqlFunctionCategory.TIMEDATE);
--- End diff --

should I simply create the same class with using PROCTIME in place of 
ROWTIME?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3271: [FLINK-5710] Add ProcTime() function to indicate S...

2017-02-15 Thread huawei-flink
Github user huawei-flink commented on a diff in the pull request:

https://github.com/apache/flink/pull/3271#discussion_r101454598
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ProcTimeCallGen.scala
 ---
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen.calls
+
+import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, 
TypeInformation}
+import org.apache.flink.table.codegen.{CodeGenerator, GeneratedExpression}
+
+/**
+  * Generates function call to determine current time point (as 
date/time/timestamp) in
--- End diff --

not sure I understand this point. Should the generate method throw an 
exception, or should the code for the exception be generated?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3302: [FLINK-5710] Add ProcTime() function to indicate S...

2017-02-13 Thread huawei-flink
GitHub user huawei-flink opened a pull request:

https://github.com/apache/flink/pull/3302

[FLINK-5710] Add ProcTime() function to indicate StreamSQL

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/huawei-flink/flink FLINK-5710

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3302.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3302






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3271: FLINK-5710

2017-02-10 Thread huawei-flink
Github user huawei-flink commented on the issue:

https://github.com/apache/flink/pull/3271
  
@fhueske not sure this was noticed. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3271: Flink 5710

2017-02-06 Thread huawei-flink
GitHub user huawei-flink opened a pull request:

https://github.com/apache/flink/pull/3271

Flink 5710

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [ ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ ] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ ] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/huawei-flink/flink FLINK-5710

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3271.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3271


commit 72ec35a7380a4d73bd092ce14962ab2248139bae
Author: Stefano Bortoli <s.bort...@gmail.com>
Date:   2017-02-01T16:15:58Z

First implementation of ProcTime()

commit eaf4e92784dab01b17004390968ca4b1fe7c4bea
Author: Stefano Bortoli <s.bort...@gmail.com>
Date:   2017-02-06T13:17:43Z

ignore aggregation test and implemented simple proctime test




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---