[GitHub] flink pull request #3783: [FLINK-6388] Add support for DISTINCT into Code Ge...

2017-07-05 Thread stefanobortoli
Github user stefanobortoli commented on a diff in the pull request:

https://github.com/apache/flink/pull/3783#discussion_r125677038
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -296,6 +299,39 @@ class CodeGenerator(
   fields.mkString(", ")
 }
 
+def genInitialize(): String = {
+  
+  val sig: String = 
+j"""
+   |  public void initialize(
+   |org.apache.flink.api.common.functions.RuntimeContext ctx
+   |  )""".stripMargin
+
+  val initDist: String = if( distinctAggsFlags.isDefined ) {
+val statePackage = "org.apache.flink.api.common.state"
+val distAggsFlags = distinctAggsFlags.get
+  for(i <- distAggsFlags.indices) yield
+if(distAggsFlags(i)) {
+  val typeString = javaTypes(aggFields(i)(0))
--- End diff --

actually, why shouldn't we use directly rows? is there any specific reason 
to prefer tuple?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3783: [FLINK-6388] Add support for DISTINCT into Code Generated...

2017-07-03 Thread stefanobortoli
Github user stefanobortoli commented on the issue:

https://github.com/apache/flink/pull/3783
  
@fhueske thanks for the comments. Did we include the latest calcite already?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3783: [FLINK-6388] Add support for DISTINCT into Code Ge...

2017-07-03 Thread stefanobortoli
Github user stefanobortoli commented on a diff in the pull request:

https://github.com/apache/flink/pull/3783#discussion_r125291954
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
 ---
@@ -20,16 +20,22 @@ package org.apache.flink.table.runtime.aggregate
 
 import org.apache.flink.api.common.functions.Function
 import org.apache.flink.types.Row
+import org.apache.flink.api.common.functions.RuntimeContext
 
 /**
   * Base class for code-generated aggregations.
   */
 abstract class GeneratedAggregations extends Function {
+  
+  /**
+* Initialize the state for the distinct aggregation check
+*
+* @param ctx the runtime context to retrieve and initialize the 
distinct states
+*/
+  def initialize(ctx: RuntimeContext)
 
   /**
-* Sets the results of the aggregations (partial or final) to the 
output row.
--- End diff --

I think it was some formatting issue


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3783: [FLINK-6388] Add support for DISTINCT into Code Ge...

2017-07-03 Thread stefanobortoli
Github user stefanobortoli commented on a diff in the pull request:

https://github.com/apache/flink/pull/3783#discussion_r125291702
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -296,6 +299,39 @@ class CodeGenerator(
   fields.mkString(", ")
 }
 
+def genInitialize(): String = {
+  
+  val sig: String = 
+j"""
+   |  public void initialize(
+   |org.apache.flink.api.common.functions.RuntimeContext ctx
+   |  )""".stripMargin
+
+  val initDist: String = if( distinctAggsFlags.isDefined ) {
+val statePackage = "org.apache.flink.api.common.state"
+val distAggsFlags = distinctAggsFlags.get
+  for(i <- distAggsFlags.indices) yield
+if(distAggsFlags(i)) {
+  val typeString = javaTypes(aggFields(i)(0))
--- End diff --

sounds good


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3771: [FLINK-6250] Distinct procTime with Rows boundarie...

2017-04-26 Thread stefanobortoli
Github user stefanobortoli closed the pull request at:

https://github.com/apache/flink/pull/3771


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3732: [FLINK-6250] Distinct procTime with Rows boundaries

2017-04-26 Thread stefanobortoli
Github user stefanobortoli commented on the issue:

https://github.com/apache/flink/pull/3732
  
@fhueske @sunjincheng121 @shijinkui @hongyuhong  I have created a PR with 
the latest master with the code generated distinct, #3771 please have a look. 
If we it is fine, we can basically support distinct for all the window types


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3771: [FLINK-6250] Distinct procTime with Rows boundaries

2017-04-25 Thread stefanobortoli
Github user stefanobortoli commented on the issue:

https://github.com/apache/flink/pull/3771
  
@fhueske @rtudoran @shijinkui @sunjincheng121 I have create a new PR for 
distinct in the code generator. Please have a look and let me know. I have 
implemented and tested only for OverProcTimeRowBounded window, but if you like 
it I can quickly implement and test also the others. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3771: [FLINK-6250] Distinct procTime with Rows boundarie...

2017-04-25 Thread stefanobortoli
GitHub user stefanobortoli opened a pull request:

https://github.com/apache/flink/pull/3771

[FLINK-6250] Distinct procTime with Rows boundaries

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [X ] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ X] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ X] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/stefanobortoli/flink FLINK-6250b

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3771.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3771






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3732: [FLINK-6250] Distinct procTime with Rows boundaries

2017-04-21 Thread stefanobortoli
Github user stefanobortoli commented on the issue:

https://github.com/apache/flink/pull/3732
  
@fhueske I have just pushed a version working with code generation (without 
modifying the code generation) There will be the need for some refactoring in 
the AggregateUtil function, but if the overall concept is sound, I will fix 
things. 

@hongyuhong , @shijinkui you could also have a look if you have time. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...

2017-04-21 Thread stefanobortoli
Github user stefanobortoli commented on a diff in the pull request:

https://github.com/apache/flink/pull/3732#discussion_r112725482
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UnsupportedOperatorsIndicatorFunctions.scala
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.functions
+
+import java.nio.charset.Charset
+import java.util.List
+
+import org.apache.calcite.rel.`type`._
+import org.apache.calcite.sql._
+import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes, 
SqlTypeFamily, SqlTypeName}
+import org.apache.calcite.sql.validate.SqlMonotonicity
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.expressions.LeafExpression
+import org.apache.calcite.sql.`type`.InferTypes
+import org.apache.calcite.sql.validate.SqlValidator
+import org.apache.calcite.sql.validate.SqlValidatorScope
+
+/**
+ * An SQL Function DISTINCT() used to mark the DISTINCT operator
+ * on aggregation input. This is temporary workaround waiting for 
+ * https://issues.apache.org/jira/browse/CALCITE-1740 being solved
+ */
+object DistinctAggregatorExtractor extends SqlFunction("DIST", 
SqlKind.OTHER_FUNCTION,
+  ReturnTypes.ARG0, InferTypes.RETURN_TYPE,
+  OperandTypes.NUMERIC, SqlFunctionCategory.NUMERIC) {
--- End diff --

one never ends to learn. :-)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...

2017-04-21 Thread stefanobortoli
Github user stefanobortoli commented on a diff in the pull request:

https://github.com/apache/flink/pull/3732#discussion_r112725820
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -91,6 +93,22 @@ class DataStreamOverAggregate(
 
 val overWindow: org.apache.calcite.rel.core.Window.Group = 
logicWindow.groups.get(0)
 
+val distinctVarMap: Map[String,Boolean] = new HashMap[String, Boolean]
--- End diff --

This is a good point. The string trick is anyway a temporary workaround.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3732: [FLINK-6250] Distinct procTime with Rows boundaries

2017-04-21 Thread stefanobortoli
Github user stefanobortoli commented on the issue:

https://github.com/apache/flink/pull/3732
  
@fhueske, I agree with you about the risk of temporary DIST() ingestion. 
Perhaps we could meanwhile just work on the "ProcessFunction +  Code 
generation" keeping the DIST function for test purposes tests. My concern is 
that the code my change again and all the work would just be wasted. To be 
honest, the code generation is quite new to me, and I will have to learn to 
work on that. Meanwhile, I have almost completed a version that relies on 
current code generation, nesting the distinct logic. As it is almost done, I 
will share this one as well and then if necessary move to the code generation. 
what do you think? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...

2017-04-21 Thread stefanobortoli
Github user stefanobortoli commented on a diff in the pull request:

https://github.com/apache/flink/pull/3732#discussion_r112713096
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
 ---
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.state.ListState
+
+class ProcTimeBoundedDistinctRowsOver(
+  private val aggregates: Array[AggregateFunction[_]],
+  private val aggFields: Array[Array[Int]],
+  private val distinctAggsFlag: Array[Boolean],
+  private val precedingOffset: Long,
+  private val forwardedFieldCount: Int,
+  private val aggregatesTypeInfo: RowTypeInfo,
+  private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkNotNull(distinctAggsFlag)
+  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkArgument(precedingOffset > 0)
+
+  private var accumulatorState: ValueState[Row] = _
+  private var rowMapState: MapState[Long, JList[Row]] = _
+  private var output: Row = _
+  private var counterState: ValueState[Long] = _
+  private var smallestTsState: ValueState[Long] = _
+  private var distinctValueStateList: Array[MapState[Any, Long]] = _
+
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState = 
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
+smallestTsState = 
getRuntimeContext.getState(smallestTimestampDescriptor)
+distinctValueStateList = new Array(aggregates.size)
+f

[GitHub] flink pull request #3732: [FLINK-6250] Distinct procTime with Rows boundarie...

2017-04-21 Thread stefanobortoli
Github user stefanobortoli commented on a diff in the pull request:

https://github.com/apache/flink/pull/3732#discussion_r112712717
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedDistinctRowsOver.scala
 ---
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{List => JList}
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.state.ListState
+
+class ProcTimeBoundedDistinctRowsOver(
+  private val aggregates: Array[AggregateFunction[_]],
+  private val aggFields: Array[Array[Int]],
+  private val distinctAggsFlag: Array[Boolean],
+  private val precedingOffset: Long,
+  private val forwardedFieldCount: Int,
+  private val aggregatesTypeInfo: RowTypeInfo,
+  private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkNotNull(distinctAggsFlag)
+  Preconditions.checkNotNull(distinctAggsFlag.length == aggregates.length)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkArgument(precedingOffset > 0)
+
+  private var accumulatorState: ValueState[Row] = _
+  private var rowMapState: MapState[Long, JList[Row]] = _
+  private var output: Row = _
+  private var counterState: ValueState[Long] = _
+  private var smallestTsState: ValueState[Long] = _
+  private var distinctValueStateList: Array[MapState[Any, Long]] = _
+
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a Map state keyed
+// by the ingestion time in the operator.
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo)
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val aggregationStateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+accumulatorState = 
getRuntimeContext.getState(aggregationStateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallestTimestampDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("smallestTSState", classOf[Long])
+smallestTsState = 
getRuntimeContext.getState(smallestTimestampDescriptor)
+distinctValueStateList = new Array(aggregates.size)
+f

[GitHub] flink issue #3574: [FLINK-5653] Add processing time OVER ROWS BETWEEN x PREC...

2017-03-30 Thread stefanobortoli
Github user stefanobortoli commented on the issue:

https://github.com/apache/flink/pull/3574
  
@fhueske @sunjincheng121 the latest merge caused a conflict, however I have 
already pushed a new branch and created a new PR. This can be closed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3653: [FLINK-5653] Add processing time OVER ROWS BETWEEN x PREC...

2017-03-30 Thread stefanobortoli
Github user stefanobortoli commented on the issue:

https://github.com/apache/flink/pull/3653
  
@fhueske @sunjincheng121 I have merged with the most recent branch,  using 
the function freshly merged. I think the PR is good to merge now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3653: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-30 Thread stefanobortoli
GitHub user stefanobortoli opened a pull request:

https://github.com/apache/flink/pull/3653

[FLINK-5653] Add processing time OVER ROWS BETWEEN x PRECEDING aggregation 
to SQL

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [X] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [ X] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [ X] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/huawei-flink/flink FLINK-5653d

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3653.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3653


commit 5ca561b0a3cec68e9386286eb445275ba9b4ce77
Author: Stefano Bortoli <s.bort...@gmail.com>
Date:   2017-03-30T09:28:41Z

Over aggregation with row range ang procTime semantic




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3574: [FLINK-5653] Add processing time OVER ROWS BETWEEN x PREC...

2017-03-29 Thread stefanobortoli
Github user stefanobortoli commented on the issue:

https://github.com/apache/flink/pull/3574
  
@fhueske @sunjincheng121 @rtudoran sorry for the extra commit, I saw some 
comments too little too late. Now the code should comply with the requested 
changes. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3574: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-29 Thread stefanobortoli
Github user stefanobortoli commented on a diff in the pull request:

https://github.com/apache/flink/pull/3574#discussion_r108703268
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRowProcessFunction.scala
 ---
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{ Collector, Preconditions }
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{ Accumulator, AggregateFunction }
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{ ArrayList, LinkedList, List => JList }
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+class BoundedProcessingOverRowProcessFunction(
+  private val aggregates: Array[AggregateFunction[_]],
+  private val aggFields: Array[Int],
+  private val precedingOffset: Int,
+  private val forwardedFieldCount: Int,
+  private val aggregatesTypeInfo: RowTypeInfo,
+  private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkArgument(precedingOffset > 0)
+
+  private var accumulators: Row = _
+  private var accumulatorState: ValueState[Row] = _
+  private var rowMapState: MapState[Long, JList[Row]] = _
+  private var output: Row = _
+  private var counterState: ValueState[Long] = _
+  private var counter : Long = _
+  private var smallestTsState: ValueState[Long] = _
+  private var smallestTs : Long = _
+  
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+// We keep the elements received in a list state 
+// together with the ingestion time in the operator
+// we also keep counter of processed elements
+// and timestamp of oldest element
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo)
+
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val stateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+  
+accumulatorState = getRuntimeContext.getState(stateDescriptor)
+
+val processedCountDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("processedCountState", classOf[Long])
+
+counterState = getRuntimeContext.getState(processedCountDescriptor)
+
+val smallesTimestampDescriptor : ValueStateDescriptor[Long] =
+   new ValueStateDescriptor[Long]("smallesTSState", classOf[Long])
+
+smallestTsState = 
getRuntimeContext.getState(smallesTimestampDescriptor)
+
+  }
+
+  override def processElement(
+input: Row,
+ctx: ProcessFunction[Row, Row]#Context,
+out: Collector[Row]): Unit = {
+
+val currentTi

[GitHub] flink pull request #3574: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-29 Thread stefanobortoli
Github user stefanobortoli commented on a diff in the pull request:

https://github.com/apache/flink/pull/3574#discussion_r108626444
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
 ---
@@ -350,4 +350,59 @@ class WindowAggregateTest extends TableTestBase {
 streamUtil.verifySql(sql, expected)
   }
 
+  @Test
+  def testBoundedNonPartitionedProcessingWindowWithRow() = {
+val sql = "SELECT " +
+  "c, " +
+  "count(a) OVER (ORDER BY procTime() ROWS BETWEEN 2 preceding AND " +
+  "CURRENT ROW) as cnt1 " +
+  "from MyTable"
+
+val expected =
+  unaryNode(
+"DataStreamCalc",
+unaryNode(
+  "DataStreamOverAggregate",
+  unaryNode(
+"DataStreamCalc",
+streamTableNode(0),
+term("select", "a", "c", "PROCTIME() AS $2")
+  ),
+  term("orderBy", "PROCTIME"),
+  term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"),
+  term("select", "a", "c", "PROCTIME", "COUNT(a) AS w0$o0")
+),
+term("select", "c", "w0$o0 AS $1")
+  )
+streamUtil.verifySql(sql, expected)
+  }
+  
+  @Test
+  def testBoundedPartitionedProcessingWindowWithRow() = {
+val sql = "SELECT " +
+  "c, " +
+  "count(a) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 
preceding AND " +
--- End diff --

I have tested 4 situations that Fabian thought to be sufficient for our 
purpose. 3 or 4 does not change besides my work to manually assemble the test. 
:-) of course unless the aggregations are not reliable and summing 3 numbers or 
4 numbers could have an impact. :-D


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3574: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-29 Thread stefanobortoli
Github user stefanobortoli commented on a diff in the pull request:

https://github.com/apache/flink/pull/3574#discussion_r108625887
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRowProcessFunction.scala
 ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{ Collector, Preconditions }
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{ Accumulator, AggregateFunction }
+import scala.collection.mutable.Queue
+import org.apache.flink.api.common.state.ListStateDescriptor
+import org.apache.flink.api.common.state.ListState
+import org.apache.flink.api.common.typeinfo.TypeHint
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{ ArrayList, LinkedList, List => JList }
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+class BoundedProcessingOverRowProcessFunction(
+  private val aggregates: Array[AggregateFunction[_]],
+  private val aggFields: Array[Int],
+  private val bufferSize: Int,
+  private val forwardedFieldCount: Int,
+  private val aggregatesTypeInfo: RowTypeInfo,
+  private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkArgument(bufferSize > 0)
+
+  private var accumulators: Row = _
+  private var output: Row = _
+  private var accumulatorState: ValueState[Row] = _
+  private var rowMapState: MapState[Long, JList[Row]] = _
+  
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+   // We keep the elements received in a list state 
+// together with the ingestion time in the operator
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo)
+
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val stateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+  
+accumulatorState = getRuntimeContext.getState(stateDescriptor)
+  }
+
+  override def processElement(
+input: Row,
+ctx: ProcessFunction[Row, Row]#Context,
+out: Collector[Row]): Unit = {
+
+val currentTime = ctx.timerService().currentProcessingTime()
+var i = 0
+
+var accumulators = accumulatorState.value()
+// initialize state for the first processed element
+if(accumulators == null){
+  accumulators = new Row(aggregates.length)
+  while (i < aggregates.length) {
+accumulators.setField(i, aggregates(i).createAccumulator())
+i += 1
+  }
+}
+
+val keyIter = rowMapState.keys.iterator
+var oldestTimeStamp = currentTime
+var toRetract: JList[Row] = null
+var currentKeyTime: Long = 0L

[GitHub] flink pull request #3574: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-29 Thread stefanobortoli
Github user stefanobortoli commented on a diff in the pull request:

https://github.com/apache/flink/pull/3574#discussion_r108611315
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -195,6 +194,45 @@ class DataStreamOverAggregate(
 result
   }
 
+  def createBoundedAndCurrentRowProcessingTimeOverWindow(
+inputDS: DataStream[Row]): DataStream[Row] = {
+
+val overWindow: Group = logicWindow.groups.get(0)
+val partitionKeys: Array[Int] = overWindow.keys.toArray
+val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = 
generateNamedAggregates
+
+// get the output types
+val rowTypeInfo = 
FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
+
+// window size is lowerbound +1 to comply with over semantics 
+val lowerbound: Int = AggregateUtil.getLowerBoundary(
--- End diff --

sure


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3574: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-29 Thread stefanobortoli
Github user stefanobortoli commented on a diff in the pull request:

https://github.com/apache/flink/pull/3574#discussion_r108611259
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
 ---
@@ -696,6 +696,181 @@ class SqlITCase extends StreamingWithStateTestBase {
   "6,8,Hello world,51,9,5,9,1")
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
+  
+ //
+   // START TESTING BOUNDED PROC TIME ROW AGGREGATION
+   //
+  
+
+  @Test
+  def testSumMinAggregatation2(): Unit = {
--- End diff --

@fhueske suggested the 4 test, and 4 I implemented. It is the 5th time I 
implement the tests, we can leave it like that. :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3574: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-29 Thread stefanobortoli
Github user stefanobortoli commented on a diff in the pull request:

https://github.com/apache/flink/pull/3574#discussion_r108610311
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
 ---
@@ -350,4 +350,59 @@ class WindowAggregateTest extends TableTestBase {
 streamUtil.verifySql(sql, expected)
   }
 
+  @Test
+  def testBoundedNonPartitionedProcessingWindowWithRow() = {
+val sql = "SELECT " +
+  "c, " +
+  "count(a) OVER (ORDER BY procTime() ROWS BETWEEN 2 preceding AND " +
+  "CURRENT ROW) as cnt1 " +
+  "from MyTable"
+
+val expected =
+  unaryNode(
+"DataStreamCalc",
+unaryNode(
+  "DataStreamOverAggregate",
+  unaryNode(
+"DataStreamCalc",
+streamTableNode(0),
+term("select", "a", "c", "PROCTIME() AS $2")
+  ),
+  term("orderBy", "PROCTIME"),
+  term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"),
+  term("select", "a", "c", "PROCTIME", "COUNT(a) AS w0$o0")
+),
+term("select", "c", "w0$o0 AS $1")
+  )
+streamUtil.verifySql(sql, expected)
+  }
+  
+  @Test
+  def testBoundedPartitionedProcessingWindowWithRow() = {
+val sql = "SELECT " +
+  "c, " +
+  "count(a) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 
preceding AND " +
--- End diff --

why?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3574: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-29 Thread stefanobortoli
Github user stefanobortoli commented on a diff in the pull request:

https://github.com/apache/flink/pull/3574#discussion_r108610149
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRowProcessFunction.scala
 ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{ Collector, Preconditions }
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{ Accumulator, AggregateFunction }
+import scala.collection.mutable.Queue
+import org.apache.flink.api.common.state.ListStateDescriptor
+import org.apache.flink.api.common.state.ListState
+import org.apache.flink.api.common.typeinfo.TypeHint
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{ ArrayList, LinkedList, List => JList }
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+class BoundedProcessingOverRowProcessFunction(
+  private val aggregates: Array[AggregateFunction[_]],
+  private val aggFields: Array[Int],
+  private val bufferSize: Int,
+  private val forwardedFieldCount: Int,
+  private val aggregatesTypeInfo: RowTypeInfo,
+  private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkArgument(bufferSize > 0)
+
+  private var accumulators: Row = _
+  private var output: Row = _
+  private var accumulatorState: ValueState[Row] = _
+  private var rowMapState: MapState[Long, JList[Row]] = _
+  
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+   // We keep the elements received in a list state 
+// together with the ingestion time in the operator
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo)
+
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val stateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+  
+accumulatorState = getRuntimeContext.getState(stateDescriptor)
+  }
+
+  override def processElement(
+input: Row,
+ctx: ProcessFunction[Row, Row]#Context,
+out: Collector[Row]): Unit = {
+
+val currentTime = ctx.timerService().currentProcessingTime()
+var i = 0
+
+var accumulators = accumulatorState.value()
+// initialize state for the first processed element
+if(accumulators == null){
+  accumulators = new Row(aggregates.length)
+  while (i < aggregates.length) {
+accumulators.setField(i, aggregates(i).createAccumulator())
+i += 1
+  }
+}
+
+val keyIter = rowMapState.keys.iterator
+var oldestTimeStamp = currentTime
+var toRetract: JList[Row] = null
+var currentKeyTime: Long =

[GitHub] flink pull request #3574: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-29 Thread stefanobortoli
Github user stefanobortoli commented on a diff in the pull request:

https://github.com/apache/flink/pull/3574#discussion_r108610034
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRowProcessFunction.scala
 ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{ Collector, Preconditions }
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{ Accumulator, AggregateFunction }
+import scala.collection.mutable.Queue
+import org.apache.flink.api.common.state.ListStateDescriptor
+import org.apache.flink.api.common.state.ListState
+import org.apache.flink.api.common.typeinfo.TypeHint
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{ ArrayList, LinkedList, List => JList }
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+class BoundedProcessingOverRowProcessFunction(
+  private val aggregates: Array[AggregateFunction[_]],
+  private val aggFields: Array[Int],
+  private val bufferSize: Int,
+  private val forwardedFieldCount: Int,
+  private val aggregatesTypeInfo: RowTypeInfo,
+  private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkArgument(bufferSize > 0)
+
+  private var accumulators: Row = _
+  private var output: Row = _
+  private var accumulatorState: ValueState[Row] = _
+  private var rowMapState: MapState[Long, JList[Row]] = _
+  
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+   // We keep the elements received in a list state 
+// together with the ingestion time in the operator
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo)
+
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val stateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+  
+accumulatorState = getRuntimeContext.getState(stateDescriptor)
+  }
+
+  override def processElement(
+input: Row,
+ctx: ProcessFunction[Row, Row]#Context,
+out: Collector[Row]): Unit = {
+
+val currentTime = ctx.timerService().currentProcessingTime()
+var i = 0
+
+var accumulators = accumulatorState.value()
+// initialize state for the first processed element
+if(accumulators == null){
+  accumulators = new Row(aggregates.length)
+  while (i < aggregates.length) {
+accumulators.setField(i, aggregates(i).createAccumulator())
+i += 1
+  }
+}
+
+val keyIter = rowMapState.keys.iterator
+var oldestTimeStamp = currentTime
+var toRetract: JList[Row] = null
--- End diff --

details, but o

[GitHub] flink pull request #3574: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-29 Thread stefanobortoli
Github user stefanobortoli commented on a diff in the pull request:

https://github.com/apache/flink/pull/3574#discussion_r108609894
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRowProcessFunction.scala
 ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.{ Collector, Preconditions }
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.table.functions.{ Accumulator, AggregateFunction }
+import scala.collection.mutable.Queue
+import org.apache.flink.api.common.state.ListStateDescriptor
+import org.apache.flink.api.common.state.ListState
+import org.apache.flink.api.common.typeinfo.TypeHint
+import org.apache.flink.api.common.state.MapState
+import org.apache.flink.api.common.state.MapStateDescriptor
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.ListTypeInfo
+import java.util.{ ArrayList, LinkedList, List => JList }
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+class BoundedProcessingOverRowProcessFunction(
+  private val aggregates: Array[AggregateFunction[_]],
+  private val aggFields: Array[Int],
+  private val bufferSize: Int,
+  private val forwardedFieldCount: Int,
+  private val aggregatesTypeInfo: RowTypeInfo,
+  private val inputType: TypeInformation[Row])
+extends ProcessFunction[Row, Row] {
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+  Preconditions.checkArgument(bufferSize > 0)
+
+  private var accumulators: Row = _
+  private var output: Row = _
+  private var accumulatorState: ValueState[Row] = _
+  private var rowMapState: MapState[Long, JList[Row]] = _
+  
+  override def open(config: Configuration) {
+
+output = new Row(forwardedFieldCount + aggregates.length)
+   // We keep the elements received in a list state 
+// together with the ingestion time in the operator
+val rowListTypeInfo: TypeInformation[JList[Row]] =
+  new 
ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]]
+
+val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
+  new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState",
+BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], 
rowListTypeInfo)
+
+rowMapState = getRuntimeContext.getMapState(mapStateDescriptor)
+
+val stateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("aggregationState", aggregatesTypeInfo)
+  
+accumulatorState = getRuntimeContext.getState(stateDescriptor)
+  }
+
+  override def processElement(
+input: Row,
+ctx: ProcessFunction[Row, Row]#Context,
+out: Collector[Row]): Unit = {
+
+val currentTime = ctx.timerService().currentProcessingTime()
+var i = 0
+
+var accumulators = accumulatorState.value()
+// initialize state for the first processed element
+if(accumulators == null){
+  accumulators = new Row(aggregates.length)
+  while (i < aggregates.length) {
+accumulators.setField(i, aggregates(i).createAccumulator())
+i += 1
+  }
+}
+
+val keyIter = rowMapState.keys.iterator
+var oldestTimeStamp = currentTime
+var toRetract: JList[Row] = null
+var currentKeyTime: Long = 0L

[GitHub] flink issue #3574: [FLINK-5653] Add processing time OVER ROWS BETWEEN x PREC...

2017-03-28 Thread stefanobortoli
Github user stefanobortoli commented on the issue:

https://github.com/apache/flink/pull/3574
  
@fhueske @sunjincheng121 @rtudoran I have just completed the implementation 
with the MapState, please have a look. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3574: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-28 Thread stefanobortoli
Github user stefanobortoli commented on a diff in the pull request:

https://github.com/apache/flink/pull/3574#discussion_r108398986
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
 ---
@@ -239,4 +239,61 @@ class WindowAggregateTest extends TableTestBase {
   )
 streamUtil.verifySql(sql, expected)
   }
+  
+  @Test
+  def testBoundedNonPartitionedProcessingWindowWithRow() = {
+val sql = "SELECT " +
+  "c, " +
+  "count(a) OVER (ORDER BY procTime() ROWS BETWEEN 2 preceding AND " +
+  "CURRENT ROW) as cnt1 " +
+  "from MyTable"
+
+val expected =
+  unaryNode(
+"DataStreamCalc",
+unaryNode(
+  "DataStreamOverAggregate",
+  unaryNode(
+"DataStreamCalc",
+streamTableNode(0),
+term("select", "a", "c", "PROCTIME() AS $2")
+  ),
+  term("orderBy", "PROCTIME"),
+  term("rows", "BETWEEN $3 PRECEDING AND CURRENT ROW"),
+  term("select", "a", "c", "PROCTIME", "COUNT(a) AS w0$o0")
+),
+term("select", "c", "w0$o0 AS $1")
+  )
+streamUtil.verifySql(sql, expected)
+  }
+  
+  @Test
+  def testBoundedPartitionedProcessingWindowWithRow() = {
+val sql = "SELECT " +
+  "c, " +
+  "count(a) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 
preceding AND " +
+  "CURRENT ROW) as cnt1 " +
+  "from MyTable"
+
+val expected =
+  unaryNode(
+"DataStreamCalc",
+unaryNode(
+  "DataStreamOverAggregate",
+  unaryNode(
+"DataStreamCalc",
+streamTableNode(0),
+term("select", "a", "c", "PROCTIME() AS $2")
+  ),
+  term("partitionBy", "c"),
+  term("orderBy", "PROCTIME"),
+  term("rows", "BETWEEN $3 PRECEDING AND CURRENT ROW"),
--- End diff --

ok, now it works as you suggested


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3574: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-28 Thread stefanobortoli
Github user stefanobortoli commented on a diff in the pull request:

https://github.com/apache/flink/pull/3574#discussion_r108372957
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
 ---
@@ -239,4 +239,61 @@ class WindowAggregateTest extends TableTestBase {
   )
 streamUtil.verifySql(sql, expected)
   }
+  
+  @Test
+  def testBoundedNonPartitionedProcessingWindowWithRow() = {
+val sql = "SELECT " +
+  "c, " +
+  "count(a) OVER (ORDER BY procTime() ROWS BETWEEN 2 preceding AND " +
+  "CURRENT ROW) as cnt1 " +
+  "from MyTable"
+
+val expected =
+  unaryNode(
+"DataStreamCalc",
+unaryNode(
+  "DataStreamOverAggregate",
+  unaryNode(
+"DataStreamCalc",
+streamTableNode(0),
+term("select", "a", "c", "PROCTIME() AS $2")
+  ),
+  term("orderBy", "PROCTIME"),
+  term("rows", "BETWEEN $3 PRECEDING AND CURRENT ROW"),
+  term("select", "a", "c", "PROCTIME", "COUNT(a) AS w0$o0")
+),
+term("select", "c", "w0$o0 AS $1")
+  )
+streamUtil.verifySql(sql, expected)
+  }
+  
+  @Test
+  def testBoundedPartitionedProcessingWindowWithRow() = {
+val sql = "SELECT " +
+  "c, " +
+  "count(a) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 
preceding AND " +
+  "CURRENT ROW) as cnt1 " +
+  "from MyTable"
+
+val expected =
+  unaryNode(
+"DataStreamCalc",
+unaryNode(
+  "DataStreamOverAggregate",
+  unaryNode(
+"DataStreamCalc",
+streamTableNode(0),
+term("select", "a", "c", "PROCTIME() AS $2")
+  ),
+  term("partitionBy", "c"),
+  term("orderBy", "PROCTIME"),
+  term("rows", "BETWEEN $3 PRECEDING AND CURRENT ROW"),
--- End diff --

if I add the value the test wont pass. That is how the query is parsed in 
Calcite. Constant have to be resolved to get the lower boundary. Not sure I got 
your point.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3574: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-28 Thread stefanobortoli
Github user stefanobortoli commented on a diff in the pull request:

https://github.com/apache/flink/pull/3574#discussion_r108369391
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateOverWindowFunction.scala
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.types.Row
+import org.apache.flink.configuration.Configuration
+import 
org.apache.flink.streaming.api.functions.windowing.RichWindowFunction
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+import org.apache.flink.table.functions.AggregateFunction
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.util.Preconditions
+import org.apache.flink.table.functions.Accumulator
+import java.lang.Iterable
+
+class IncrementalAggregateOverWindowFunction[W <: Window](
+  private val numGroupingKey: Int,
+  private val numAggregates: Int,
+  private val forwardedFieldCount: Int)
+extends RichWindowFunction[Row, Row, Tuple, W] {
+
+  private var output: Row = _
+  private var reuse: Row = _
+
+  override def open(parameters: Configuration): Unit = {
+output = new Row(forwardedFieldCount + numAggregates)
+  }
+  override def apply(
+key: Tuple,
+window: W,
+records: Iterable[Row],
+out: Collector[Row]): Unit = {
+
+var i = 0
+val iter = records.iterator
+while (iter.hasNext) {
+  reuse = iter.next
--- End diff --

extend and implement are different in java, really, just scala confusion to 
me. :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1827] and small fixes in some tests

2016-05-03 Thread stefanobortoli
Github user stefanobortoli commented on the pull request:

https://github.com/apache/flink/pull/1915#issuecomment-216521677
  
 @tillrohrmann, I see the conflicts. How should I deal with this? rebase?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1827] and small fixes in some tests

2016-05-03 Thread stefanobortoli
Github user stefanobortoli commented on a diff in the pull request:

https://github.com/apache/flink/pull/1915#discussion_r61855824
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/util/StartupUtils.java ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import java.util.List;
+
+public class StartupUtils {
+   
+   /**
+* A utility method to analyze the exceptions and collect the clauses
+* 
+* @param e  the root exception (Throwable) object
+* @param causes  the list of exceptions that caused the root exceptions
+* @return  a list of Throwable
+*/
+   public List getExceptionCauses(Throwable e, List 
causes) {
--- End diff --

on it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1827] and small fixes in some tests

2016-04-26 Thread stefanobortoli
Github user stefanobortoli commented on a diff in the pull request:

https://github.com/apache/flink/pull/1915#discussion_r61037816
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
 ---
@@ -53,41 +59,55 @@ public void testStartupWhenTaskmanagerActorPortIsUsed() 
{
blocker = new ServerSocket(0, 50, localAddress);
final int port = blocker.getLocalPort();
 
-   try {
-   TaskManager.runTaskManager(
-   localHostName,
-   ResourceID.generate(),
-   port,
-   new Configuration(),
+   TaskManager.runTaskManager(localHostName, 
ResourceID.generate(), port, new Configuration(),
TaskManager.class);
-   fail("This should fail with an IOException");
-   }
-   catch (IOException e) {
-   // expected. validate the error message
-   assertNotNull(e.getMessage());
-   assertTrue(e.getMessage().contains("Address 
already in use"));
+   fail("This should fail with an IOException");
+
+   } catch (IOException e) {
+   // expected. validate the error messagex
+   List causes = getExceptionCauses(e, new 
ArrayList());
+   for (Throwable cause : causes) {
+   if (cause instanceof BindException) {
+   throw (BindException) cause;
+   }
}
-
-   }
-   catch (Exception e) {
+   fail("This should fail with an exception caused by 
BindException");
+   } catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
-   }
-   finally {
+   } finally {
if (blocker != null) {
try {
blocker.close();
-   }
-   catch (IOException e) {
+   } catch (IOException e) {
// no need to log here
}
}
}
}
 
/**
-* Tests that the TaskManager startup fails synchronously when the I/O 
directories are
-* not writable.
+* A utility method to analyze the exceptions and collect the clauses
+* 
+* @param e
+*the root exception (Throwable) object
+* @param causes
+*the list of exceptions that caused the root exceptions
+* @return
+*/
+   private List getExceptionCauses(Throwable e, List 
causes) {
--- End diff --

sure I can do that


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Flink 1827 and small fixes in some tests

2016-04-20 Thread stefanobortoli
Github user stefanobortoli commented on the pull request:

https://github.com/apache/flink/pull/1915#issuecomment-212305774
  
We had a look at the Flink test dependency hierarchies and the thing is 
feasible but I it should be done in a separate PR. In fact, it requires a 
clerical review of all projects and a better management of the test's 
transitive dependencies that right now is very redundant. For example, if I 
need _flink-optimizer_ I do not need to specify also the dependencies on 
_flink-core_ and _flink-runtime_ because they are included transitively.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Flink 1827 and small fixes in some tests

2016-04-20 Thread stefanobortoli
Github user stefanobortoli commented on the pull request:

https://github.com/apache/flink/pull/1915#issuecomment-212293101
  
We mainly wanted to simplify the build skipping tests. However, if you 
think it makes sense, we can give it a try. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Flink 1827 and small fixes in some tests

2016-04-19 Thread stefanobortoli
GitHub user stefanobortoli opened a pull request:

https://github.com/apache/flink/pull/1915

Flink 1827 and small fixes in some tests

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [X] General
  - The pull request references the related JIRA issue
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message

- [X] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [X] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/stefanobortoli/flink FLINK-1827

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1915.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1915


commit 88483205c57884303c9e46159df9d89aa953b547
Author: okkam <okkam@okkamvm>
Date:   2016-04-19T09:50:59Z

FLINK-1827 fixed compilation that skips tests

commit 5a5cc98ba4fc407fe0fc18d7a57dc4455ce80001
Author: okkam <okkam@okkamvm>
Date:   2016-04-19T09:50:59Z

FLINK-1827 fixed compilation that skips tests

commit 85b1763ea84a59258a3a4fef87d05650e99b5be0
Author: okkam <okkam@okkamvm>
Date:   2016-04-19T09:58:13Z

Merge remote-tracking branch 'origin/FLINK-1827' into FLINK-1827

Conflicts:
flink-test-utils/pom.xml

commit 310b4e901283fde317dc97c9d454341d331f5d04
Author: okkam <okkam@okkamvm>
Date:   2016-04-19T16:37:55Z

FLINK-1827 and improved tests removing references to localized messages




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---