[jira] [Commented] (FLINK-5754) released tag missing .gitigonore .travis.yml .gitattributes

2017-03-17 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929948#comment-15929948
 ] 

Greg Hogan commented on FLINK-5754:
---

[~shijinkui] it looks like the objective is to tag the release. Since the 
release is not a git repo the associated files (.gitigonore, .travis.yml, 
.gitattributes, etc.) are deleted.

Why are you not working off the {{release-1.2}} branch or even 
{{release-1.2.0^}}?

> released tag missing .gitigonore  .travis.yml .gitattributes
> 
>
> Key: FLINK-5754
> URL: https://issues.apache.org/jira/browse/FLINK-5754
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: shijinkui
>
> released tag missing .gitigonore  .travis.yml .gitattributes.
> When make a release version, should only replace the version.
> for example: https://github.com/apache/spark/tree/v2.1.0



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5650) Flink-python tests executing cost too long time

2017-03-17 Thread shijinkui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929946#comment-15929946
 ] 

shijinkui commented on FLINK-5650:
--

Good job, Thanks.

> Flink-python tests executing cost too long time
> ---
>
> Key: FLINK-5650
> URL: https://issues.apache.org/jira/browse/FLINK-5650
> Project: Flink
>  Issue Type: Bug
>  Components: Python API, Tests
>Affects Versions: 1.2.0, 1.3.0
>Reporter: shijinkui
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: osx
> Fix For: 1.3.0, 1.2.1
>
>
> When execute `mvn clean test` in flink-python, it will wait more than half 
> hour after the console output below:
> ---
>  T E S T S
> ---
> Running org.apache.flink.python.api.PythonPlanBinderTest
> log4j:WARN No appenders could be found for logger 
> (org.apache.flink.python.api.PythonPlanBinderTest).
> log4j:WARN Please initialize the log4j system properly.
> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more 
> info.
> The stack below:
> "main" prio=5 tid=0x7f8d7780b800 nid=0x1c03 waiting on condition 
> [0x79fd8000]
>java.lang.Thread.State: TIMED_WAITING (sleeping)
>   at java.lang.Thread.sleep(Native Method)
>   at 
> org.apache.flink.python.api.streaming.plan.PythonPlanStreamer.startPython(PythonPlanStreamer.java:70)
>   at 
> org.apache.flink.python.api.streaming.plan.PythonPlanStreamer.open(PythonPlanStreamer.java:50)
>   at 
> org.apache.flink.python.api.PythonPlanBinder.startPython(PythonPlanBinder.java:211)
>   at 
> org.apache.flink.python.api.PythonPlanBinder.runPlan(PythonPlanBinder.java:141)
>   at 
> org.apache.flink.python.api.PythonPlanBinder.main(PythonPlanBinder.java:114)
>   at 
> org.apache.flink.python.api.PythonPlanBinderTest.testProgram(PythonPlanBinderTest.java:83)
>   at 
> org.apache.flink.test.util.JavaProgramTestBase.testJobWithoutObjectReuse(JavaProgramTestBase.java:174)
> this is the jstack:
> https://gist.github.com/shijinkui/af47e8bc6c9f748336bf52efd3df94b0



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3543: [FLINK-5985] [Backport for 1.2] Report no task states for...

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3543
  
Looks good.

+1 for merging this!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5985) Flink treats every task as stateful (making topology changes impossible)

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929932#comment-15929932
 ] 

ASF GitHub Bot commented on FLINK-5985:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3543
  
Looks good.

+1 for merging this!


> Flink treats every task as stateful (making topology changes impossible)
> 
>
> Key: FLINK-5985
> URL: https://issues.apache.org/jira/browse/FLINK-5985
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Gyula Fora
>Priority: Critical
>
> It seems  that Flink treats every Task as stateful so changing the topology 
> is not possible without setting uid on every single operator.
> If the topology has an iteration this is virtually impossible (or at least 
> gets super hacky)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3127: [FLINK-5481] Simplify Row creation

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3127#discussion_r106644720
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
 ---
@@ -17,29 +17,51 @@
  */
 package org.apache.flink.table.api
 
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
SqlTimeTypeInfo}
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
+import org.apache.flink.api.java.typeutils.{Types => JTypes}
 
 /**
   * This class enumerates all supported types of the Table API.
   */
-object Types {
+object Types extends JTypes {
 
-  val STRING = BasicTypeInfo.STRING_TYPE_INFO
-  val BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO
+  val STRING = JTypes.STRING
+  val BOOLEAN = JTypes.BOOLEAN
 
-  val BYTE = BasicTypeInfo.BYTE_TYPE_INFO
-  val SHORT = BasicTypeInfo.SHORT_TYPE_INFO
-  val INT = BasicTypeInfo.INT_TYPE_INFO
-  val LONG = BasicTypeInfo.LONG_TYPE_INFO
-  val FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO
-  val DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO
-  val DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO
+  val BYTE = JTypes.BYTE
+  val SHORT = JTypes.SHORT
+  val INT = JTypes.INT
+  val LONG = JTypes.LONG
+  val FLOAT = JTypes.FLOAT
+  val DOUBLE = JTypes.DOUBLE
+  val DECIMAL = JTypes.DECIMAL
 
-  val DATE = SqlTimeTypeInfo.DATE
-  val TIME = SqlTimeTypeInfo.TIME
-  val TIMESTAMP = SqlTimeTypeInfo.TIMESTAMP
+  val SQL_DATE = JTypes.SQL_DATE
+  val SQL_TIME = JTypes.SQL_TIME
+  val SQL_TIMESTAMP = JTypes.SQL_TIMESTAMP
   val INTERVAL_MONTHS = TimeIntervalTypeInfo.INTERVAL_MONTHS
   val INTERVAL_MILLIS = TimeIntervalTypeInfo.INTERVAL_MILLIS
 
+  /**
+* Generates RowTypeInfo with default names (f1, f2 ..).
+* same as ``new RowTypeInfo(types)``
+*
+* @param types of Row fields. e.g. ROW(Types.STRING, Types.INT)
+*/
+  def ROW[T](types: TypeInformation[_]*)(implicit m: Manifest[T]) = {
--- End diff --

Out of curiosity: Why do you need the manifest? I think you don't need it 
as you don't reference `m` anywhere...

Also, I think the common way of doing would be:
```scala
def ROW[T: Manifest](types: TypeInformation[_]*) = { ...
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3127: [FLINK-5481] Simplify Row creation

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3127
  
Found one more small concern (inline comment)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5481) Simplify Row creation

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929928#comment-15929928
 ] 

ASF GitHub Bot commented on FLINK-5481:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3127
  
Found one more small concern (inline comment)


> Simplify Row creation
> -
>
> Key: FLINK-5481
> URL: https://issues.apache.org/jira/browse/FLINK-5481
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API, Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Anton Solovev
>Assignee: Anton Solovev
>Priority: Trivial
>
> When we use {{ExecutionEnvironment#fromElements(X... data)}} it takes first 
> element of {{data}} to define a type. If first Row in collection has wrong 
> number of fields (there are nulls) TypeExtractor returns not RowTypeInfo, but 
> GenericType



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5481) Simplify Row creation

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929927#comment-15929927
 ] 

ASF GitHub Bot commented on FLINK-5481:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3127#discussion_r106644720
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
 ---
@@ -17,29 +17,51 @@
  */
 package org.apache.flink.table.api
 
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
SqlTimeTypeInfo}
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
+import org.apache.flink.api.java.typeutils.{Types => JTypes}
 
 /**
   * This class enumerates all supported types of the Table API.
   */
-object Types {
+object Types extends JTypes {
 
-  val STRING = BasicTypeInfo.STRING_TYPE_INFO
-  val BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO
+  val STRING = JTypes.STRING
+  val BOOLEAN = JTypes.BOOLEAN
 
-  val BYTE = BasicTypeInfo.BYTE_TYPE_INFO
-  val SHORT = BasicTypeInfo.SHORT_TYPE_INFO
-  val INT = BasicTypeInfo.INT_TYPE_INFO
-  val LONG = BasicTypeInfo.LONG_TYPE_INFO
-  val FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO
-  val DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO
-  val DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO
+  val BYTE = JTypes.BYTE
+  val SHORT = JTypes.SHORT
+  val INT = JTypes.INT
+  val LONG = JTypes.LONG
+  val FLOAT = JTypes.FLOAT
+  val DOUBLE = JTypes.DOUBLE
+  val DECIMAL = JTypes.DECIMAL
 
-  val DATE = SqlTimeTypeInfo.DATE
-  val TIME = SqlTimeTypeInfo.TIME
-  val TIMESTAMP = SqlTimeTypeInfo.TIMESTAMP
+  val SQL_DATE = JTypes.SQL_DATE
+  val SQL_TIME = JTypes.SQL_TIME
+  val SQL_TIMESTAMP = JTypes.SQL_TIMESTAMP
   val INTERVAL_MONTHS = TimeIntervalTypeInfo.INTERVAL_MONTHS
   val INTERVAL_MILLIS = TimeIntervalTypeInfo.INTERVAL_MILLIS
 
+  /**
+* Generates RowTypeInfo with default names (f1, f2 ..).
+* same as ``new RowTypeInfo(types)``
+*
+* @param types of Row fields. e.g. ROW(Types.STRING, Types.INT)
+*/
+  def ROW[T](types: TypeInformation[_]*)(implicit m: Manifest[T]) = {
--- End diff --

Out of curiosity: Why do you need the manifest? I think you don't need it 
as you don't reference `m` anywhere...

Also, I think the common way of doing would be:
```scala
def ROW[T: Manifest](types: TypeInformation[_]*) = { ...
```


> Simplify Row creation
> -
>
> Key: FLINK-5481
> URL: https://issues.apache.org/jira/browse/FLINK-5481
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API, Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Anton Solovev
>Assignee: Anton Solovev
>Priority: Trivial
>
> When we use {{ExecutionEnvironment#fromElements(X... data)}} it takes first 
> element of {{data}} to define a type. If first Row in collection has wrong 
> number of fields (there are nulls) TypeExtractor returns not RowTypeInfo, but 
> GenericType



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5481) Simplify Row creation

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929925#comment-15929925
 ] 

ASF GitHub Bot commented on FLINK-5481:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3127
  
Looks good to me, +1

@twalthr @fhueske Any concerns about merging this?


> Simplify Row creation
> -
>
> Key: FLINK-5481
> URL: https://issues.apache.org/jira/browse/FLINK-5481
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API, Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Anton Solovev
>Assignee: Anton Solovev
>Priority: Trivial
>
> When we use {{ExecutionEnvironment#fromElements(X... data)}} it takes first 
> element of {{data}} to define a type. If first Row in collection has wrong 
> number of fields (there are nulls) TypeExtractor returns not RowTypeInfo, but 
> GenericType



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3127: [FLINK-5481] Simplify Row creation

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3127
  
Looks good to me, +1

@twalthr @fhueske Any concerns about merging this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5668) passing taskmanager configuration through taskManagerEnv instead of file

2017-03-17 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929916#comment-15929916
 ] 

Stephan Ewen commented on FLINK-5668:
-

There are other config files and config objects that are passes as well in 
addition to the configuration. For example, the new Yarn mode will not behave 
like "start cluster / submit job" but starts directly a jobgraph on yarn. The 
jobgraph also need to be persisted.

> passing taskmanager configuration through taskManagerEnv instead of file
> 
>
> Key: FLINK-5668
> URL: https://issues.apache.org/jira/browse/FLINK-5668
> Project: Flink
>  Issue Type: Improvement
>  Components: YARN
>Reporter: Bill Liu
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> When create a Flink cluster on Yarn,  JobManager depends on  HDFS to share  
> taskmanager-conf.yaml  with TaskManager.
> It's better to share the taskmanager-conf.yaml  on JobManager Web server 
> instead of HDFS, which could reduce the HDFS dependency  at job startup.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6098) Cassandra sink freezes after write error

2017-03-17 Thread Jakub Nowacki (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929909#comment-15929909
 ] 

Jakub Nowacki commented on FLINK-6098:
--

Well, it doesn't. :) As you can see from the above logs, which are both for the 
same job, it kept running until I killed it the next day. It keeps refusing the 
checkpoint though, which is an indicator but it does not help much. On the 
source side, if Kafka fails the job gets, indeed, restarted.

> Cassandra sink freezes after write error
> 
>
> Key: FLINK-6098
> URL: https://issues.apache.org/jira/browse/FLINK-6098
> Project: Flink
>  Issue Type: Bug
>  Components: Cassandra Connector
>Affects Versions: 1.2.0
> Environment: Flink 1.2.0, standalone cluster, Debian GNU/Linux 8.7 
> (jessie)
>Reporter: Jakub Nowacki
>
> I am having problem with a very simple pipeline taking messages form Kafka 
> and writing them into Cassandra. The pipeline runs fine for a number of days 
> and at some point I am getting the below error in the taskmanager logs:
> {code}
> 2017-03-13 16:01:50,699 ERROR 
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase  - Error 
> while sending value.
> com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra timeout 
> during write query at consistency LOCAL_ONE (1 replica were required but only 
> 0 acknowledged the write)
> Caused by: com.datastax.driver.core.exceptions.WriteTimeoutException: 
> Cassandra timeout during write query at consistency LOCAL_ONE (1 replica were 
> required but only 0 acknowledged the write)
> {code}
> The job seems to be running fine, but it does not process any messages, which 
> is only visible in the metrics and in the JobManager log:
> {code}
> 2017-03-14 14:00:44,611 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 42288 @ 1489496444610
> 2017-03-14 14:00:44,612 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding 
> checkpoint 42288 because of checkpoint decline from task 
> 35926157acfb1b68d1f6c9abcd90c8b4 : Task Source: Custom Source -> Map -> Map 
> -> Sink: Cassandra Sink (1/2) was not running
> {code}
> I know this is some Cassandra hiccup, but in theory pipeline should recover 
> after a failure or fail and stop.
> Everything seems fine and I didn't find any information specific to the 
> reconnect after failure for the Cassandra Connector. The only thing I'm not 
> sure if it's done correctly is the ClusterBuilder; i.e I use the below code 
> in the job definition main method (in Scala):
> {code:java}
> val cassandraServers = parameters.get("cassandra.servers", 
> "localhost").split(",")
> val cassandraUser = parameters.get("cassandra.user")
> val cassandraPassword = parameters.get("cassandra.password")
> val clusterBuilder = new ClusterBuilder() {
> override def buildCluster(builder: Cluster.Builder): Cluster = {
> cassandraServers.foreach(builder.addContactPoint)
> if (cassandraUser != null && cassandraPassword != null)
> builder.withCredentials(cassandraUser, cassandraPassword)
> builder.build()
>  }
> }
> {code}
> The job starts correctly but I'm not sure if the configuration from the 
> properties is pulled correctly on the taskmanages, as I understand the 
> {{buildCluster}} call is done on its side. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...

2017-03-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106545012
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -119,6 +154,64 @@ class DataStreamOverAggregate(
 
   }
 
+  def createTimeBoundedProcessingTimeOverWindow(
+inputDS: DataStream[Row]): DataStream[Row] = {
+
+val overWindow: Group = logicWindow.groups.get(0)
+val partitionKeys: Array[Int] = overWindow.keys.toArray
+val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = 
generateNamedAggregates
+
+// final long time_boundary =
+// 
Long.parseLong(windowReference.getConstants().get(1).getValue().toString());
+val index = 
overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef].getIndex
+val count = input.getRowType().getFieldCount()
+val lowerboundIndex = index - count
+val time_boundary =  logicWindow.constants.get(lowerboundIndex)
--- End diff --

Check this


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6098) Cassandra sink freezes after write error

2017-03-17 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929893#comment-15929893
 ] 

Chesnay Schepler commented on FLINK-6098:
-

It is very odd that the job does not fail/restarts.

The CassandraSink appears to be behaving as expected, it detected an issue and 
throws the exception. Or rather i assume it did, because there has to be some 
reason as to why the task is no longer running. The fact that the job doesn't 
fail, even though a task apparently failed, is a bit concerning.

> Cassandra sink freezes after write error
> 
>
> Key: FLINK-6098
> URL: https://issues.apache.org/jira/browse/FLINK-6098
> Project: Flink
>  Issue Type: Bug
>  Components: Cassandra Connector
>Affects Versions: 1.2.0
> Environment: Flink 1.2.0, standalone cluster, Debian GNU/Linux 8.7 
> (jessie)
>Reporter: Jakub Nowacki
>
> I am having problem with a very simple pipeline taking messages form Kafka 
> and writing them into Cassandra. The pipeline runs fine for a number of days 
> and at some point I am getting the below error in the taskmanager logs:
> {code}
> 2017-03-13 16:01:50,699 ERROR 
> org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase  - Error 
> while sending value.
> com.datastax.driver.core.exceptions.WriteTimeoutException: Cassandra timeout 
> during write query at consistency LOCAL_ONE (1 replica were required but only 
> 0 acknowledged the write)
> Caused by: com.datastax.driver.core.exceptions.WriteTimeoutException: 
> Cassandra timeout during write query at consistency LOCAL_ONE (1 replica were 
> required but only 0 acknowledged the write)
> {code}
> The job seems to be running fine, but it does not process any messages, which 
> is only visible in the metrics and in the JobManager log:
> {code}
> 2017-03-14 14:00:44,611 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
> checkpoint 42288 @ 1489496444610
> 2017-03-14 14:00:44,612 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding 
> checkpoint 42288 because of checkpoint decline from task 
> 35926157acfb1b68d1f6c9abcd90c8b4 : Task Source: Custom Source -> Map -> Map 
> -> Sink: Cassandra Sink (1/2) was not running
> {code}
> I know this is some Cassandra hiccup, but in theory pipeline should recover 
> after a failure or fail and stop.
> Everything seems fine and I didn't find any information specific to the 
> reconnect after failure for the Cassandra Connector. The only thing I'm not 
> sure if it's done correctly is the ClusterBuilder; i.e I use the below code 
> in the job definition main method (in Scala):
> {code:java}
> val cassandraServers = parameters.get("cassandra.servers", 
> "localhost").split(",")
> val cassandraUser = parameters.get("cassandra.user")
> val cassandraPassword = parameters.get("cassandra.password")
> val clusterBuilder = new ClusterBuilder() {
> override def buildCluster(builder: Cluster.Builder): Cluster = {
> cassandraServers.foreach(builder.addContactPoint)
> if (cassandraUser != null && cassandraPassword != null)
> builder.withCredentials(cassandraUser, cassandraPassword)
> builder.build()
>  }
> }
> {code}
> The job starts correctly but I'm not sure if the configuration from the 
> properties is pulled correctly on the taskmanages, as I understand the 
> {{buildCluster}} call is done on its side. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6101) GroupBy fields with expression can not be selected either using original name or expression

2017-03-17 Thread lincoln.lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln.lee updated FLINK-6101:
---
Description: 
currently the TableAPI do not support selecting GroupBy fields with expression 
either using original field name or the expression 

{code}
 val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 
'e)
.groupBy('e, 'b % 3)
.select('b, 'c.min, 'e, 'a.avg, 'd.count)
{code}
caused
{code}
 val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 
'e)
.groupBy('e, 'b % 3)
.select('b, 'c.min, 'e, 'a.avg, 'd.count)
{code}
and 

{code}
 val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 
'e)
.groupBy('e, 'b % 3)
.select('b%3, 'c.min, 'e, 'a.avg, 'd.count)
{code}
will cause

{code}
org.apache.flink.table.api.ValidationException: Cannot resolve [b] given input 
[e, ('b % 3), TMP_0, TMP_1, TMP_2].
{code}

and add an alias "group(e, 'b%3 as 'b)" still doesn't work
{code}
java.lang.IllegalArgumentException: field [b] not found; input fields are: [e, 
b5, TMP_0, TMP_1, TMP_2]
{code}

the only way to get this work can be 
{code}
 val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 
'e)
.select('a, 'b%3 as 'b, 'c, 'd, 'e)
.groupBy('e, 'b)
.select('b, 'c.min, 'e, 'a.avg, 'd.count)
{code}

I'm confused, should we add support alias in groupBy clause? ( it seems a bit 
odd against SQL, but TableAPI has a different groupBy grammar )

What do you think?



  was:
currently the TableAPI do not support selecting GroupBy fields with expression 
either using original field name or the expression 

{code}
 val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 
'e)
.groupBy('e, 'b % 3)
.select('b, 'c.min, 'e, 'a.avg, 'd.count)
{code}
caused
{code}
 val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 
'e)
.groupBy('e, 'b % 3)
.select('b, 'c.min, 'e, 'a.avg, 'd.count)
{code}
and 

{code}
 val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 
'e)
.groupBy('e, 'b % 3)
.select('b%3, 'c.min, 'e, 'a.avg, 'd.count)
{code}
will cause

{code}
org.apache.flink.table.api.ValidationException: Cannot resolve [b] given input 
[e, ('b % 3), TMP_0, TMP_1, TMP_2].
{code}

and add an alias "group(e, 'b%3 as 'b)" still doesn't work
{code}
java.lang.IllegalArgumentException: field [b] not found; input fields are: [e, 
b5, TMP_0, TMP_1, TMP_2]
{code}

the only way to get this work can be 
{code}
 val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 
'e)
.select('a, 'b%3 as 'b, 'c, 'd, 'e)
.groupBy('e, 'b)
.select('b, 'c.min, 'e, 'a.avg, 'd.count)
{code}

I'm confused, should we add support alias in groupBy clause? ( it seems a bit 
odd against SQL, but TableAPI has a different groupBy grammar )





> GroupBy fields with expression can not be selected either using original name 
> or expression 
> 
>
> Key: FLINK-6101
> URL: https://issues.apache.org/jira/browse/FLINK-6101
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: lincoln.lee
>
> currently the TableAPI do not support selecting GroupBy fields with 
> expression either using original field name or the expression 
> {code}
>  val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 
> 'd, 'e)
> .groupBy('e, 'b % 3)
> .select('b, 'c.min, 'e, 'a.avg, 'd.count)
> {code}
> caused
> {code}
>  val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 
> 'd, 'e)
> .groupBy('e, 'b % 3)
> .select('b, 'c.min, 'e, 'a.avg, 'd.count)
> {code}
> and 
> {code}
>  val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 
> 'd, 'e)
> .groupBy('e, 'b % 3)
> .select('b%3, 'c.min, 'e, 'a.avg, 'd.count)
> {code}
> will cause
> {code}
> org.apache.flink.table.api.ValidationException: Cannot resolve [b] given 
> input [e, ('b % 3), TMP_0, TMP_1, TMP_2].
> {code}
> and add an alias "group(e, 'b%3 as 'b)" still doesn't work
> {code}
> java.lang.IllegalArgumentException: field [b] not found; input fields are: 
> [e, b5, TMP_0, TMP_1, TMP_2]
> {code}
> the only way to get this work can be 
> {code}
>  val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 
> 'd, 'e)
> .select('a, 'b%3 as 'b, 'c, 'd, 'e)
> .groupBy('e, 'b)
> .select('b, 'c.min, 'e, 'a.avg, 'd.count)
> {code}
> I'm confused, should we add support alias in groupBy clause? ( it seems a bit 
> odd against SQL, but TableAPI has a different groupBy grammar )
> What do you think?



--

[jira] [Updated] (FLINK-6101) GroupBy fields with expression can not be selected either using original name or expression

2017-03-17 Thread lincoln.lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln.lee updated FLINK-6101:
---
Description: 
currently the TableAPI do not support selecting GroupBy fields with expression 
either using original field name or the expression 

{code}
 val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 
'e)
.groupBy('e, 'b % 3)
.select('b, 'c.min, 'e, 'a.avg, 'd.count)
{code}
caused
{code}
 val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 
'e)
.groupBy('e, 'b % 3)
.select('b, 'c.min, 'e, 'a.avg, 'd.count)
{code}
and 

{code}
 val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 
'e)
.groupBy('e, 'b % 3)
.select('b%3, 'c.min, 'e, 'a.avg, 'd.count)
{code}
will cause

{code}
org.apache.flink.table.api.ValidationException: Cannot resolve [b] given input 
[e, ('b % 3), TMP_0, TMP_1, TMP_2].
{code}

and add an alias "group(e, 'b%3 as 'b)" still doesn't work
{code}
java.lang.IllegalArgumentException: field [b] not found; input fields are: [e, 
b5, TMP_0, TMP_1, TMP_2]
{code}

the only way to get this work can be 
{code}
 val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 
'e)
.select('a, 'b%3 as 'b, 'c, 'd, 'e)
.groupBy('e, 'b)
.select('b, 'c.min, 'e, 'a.avg, 'd.count)
{code}

I'm confused, should we add support alias in groupBy clause? ( it seems a bit 
odd against SQL, but TableAPI has a different groupBy grammar )




  was:
currently the TableAPI do not support selecting GroupBy fields with expression 
either using original field name or the expression 

```
 val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 
'e)
.groupBy('e, 'b % 3)
.select('b, 'c.min, 'e, 'a.avg, 'd.count)
```
cause
```
 val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 
'e)
.groupBy('e, 'b % 3)
.select('b, 'c.min, 'e, 'a.avg, 'd.count)
```
and 

```
 val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 
'e)
.groupBy('e, 'b % 3)
.select('b%3, 'c.min, 'e, 'a.avg, 'd.count)
```
will cause

```
org.apache.flink.table.api.ValidationException: Cannot resolve [b] given input 
[e, ('b % 3), TMP_0, TMP_1, TMP_2].
```

and add an alias "group(e, 'b%3 as 'b)" still doesn't work
```
java.lang.IllegalArgumentException: field [b] not found; input fields are: [e, 
b5, TMP_0, TMP_1, TMP_2]
```

the only way to get this work can be 
```
 val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 
'e)
.select('a, 'b%3 as 'b, 'c, 'd, 'e)
.groupBy('e, 'b)
.select('b, 'c.min, 'e, 'a.avg, 'd.count)
```

I'm confused, should we add support alias in groupBy clause? ( it seems a bit 
odd against SQL, but TableAPI has a different groupBy grammar )





> GroupBy fields with expression can not be selected either using original name 
> or expression 
> 
>
> Key: FLINK-6101
> URL: https://issues.apache.org/jira/browse/FLINK-6101
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: lincoln.lee
>
> currently the TableAPI do not support selecting GroupBy fields with 
> expression either using original field name or the expression 
> {code}
>  val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 
> 'd, 'e)
> .groupBy('e, 'b % 3)
> .select('b, 'c.min, 'e, 'a.avg, 'd.count)
> {code}
> caused
> {code}
>  val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 
> 'd, 'e)
> .groupBy('e, 'b % 3)
> .select('b, 'c.min, 'e, 'a.avg, 'd.count)
> {code}
> and 
> {code}
>  val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 
> 'd, 'e)
> .groupBy('e, 'b % 3)
> .select('b%3, 'c.min, 'e, 'a.avg, 'd.count)
> {code}
> will cause
> {code}
> org.apache.flink.table.api.ValidationException: Cannot resolve [b] given 
> input [e, ('b % 3), TMP_0, TMP_1, TMP_2].
> {code}
> and add an alias "group(e, 'b%3 as 'b)" still doesn't work
> {code}
> java.lang.IllegalArgumentException: field [b] not found; input fields are: 
> [e, b5, TMP_0, TMP_1, TMP_2]
> {code}
> the only way to get this work can be 
> {code}
>  val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 
> 'd, 'e)
> .select('a, 'b%3 as 'b, 'c, 'd, 'e)
> .groupBy('e, 'b)
> .select('b, 'c.min, 'e, 'a.avg, 'd.count)
> {code}
> I'm confused, should we add support alias in groupBy clause? ( it seems a bit 
> odd against SQL, but TableAPI has a different groupBy grammar )



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3166: [FLINK-3849] Add FilterableTableSource interface a...

2017-03-17 Thread tonycox
Github user tonycox closed the pull request at:

https://github.com/apache/flink/pull/3166


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3392: [FLINK-5883] Re-adding the Exception-thrown code for List...

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3392
  
+1
Merging this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3849) Add FilterableTableSource interface and translation rule

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929815#comment-15929815
 ] 

ASF GitHub Bot commented on FLINK-3849:
---

Github user tonycox closed the pull request at:

https://github.com/apache/flink/pull/3166


> Add FilterableTableSource interface and translation rule
> 
>
> Key: FLINK-3849
> URL: https://issues.apache.org/jira/browse/FLINK-3849
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Kurt Young
> Fix For: 1.3.0
>
>
> Add a {{FilterableTableSource}} interface for {{TableSource}} implementations 
> which support filter push-down.
> The interface could look as follows
> {code}
> def trait FilterableTableSource {
>   // returns unsupported predicate expression
>   def setPredicate(predicate: Expression): Expression
> }
> {code}
> In addition we need Calcite rules to push a predicate (or parts of it) into a 
> TableScan that refers to a {{FilterableTableSource}}. We might need to tweak 
> the cost model as well to push the optimizer in the right direction.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...

2017-03-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106548270
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/DataStreamProcTimeBoundAggIntegrationTest.scala
 ---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.{ TableEnvironment, TableException }
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.utils.{
+  StreamingWithStateTestBase,
+  StreamITCase,
+  StreamTestData
+}
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit._
+import scala.collection.mutable
+
+class DataStreamProcTimeBoundAggIntegrationTest extends 
StreamingWithStateTestBase {
--- End diff --

Please see the comments on PR #3547 regarding the tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...

2017-03-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106630745
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataStreamProcTimeAggregateWindowFunction.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.types.Row
+import org.apache.flink.configuration.Configuration
+import 
org.apache.flink.streaming.api.functions.windowing.RichWindowFunction
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+import org.apache.flink.table.functions.AggregateFunction
+import org.apache.flink.table.functions.Accumulator
+
+/**
+  * Computes the final aggregate value from incrementally computed 
aggreagtes.
+  *
+  * @param numGroupingKey The number of grouping keys.
+  * @param numAggregates The number of aggregates.
+  * @param finalRowArity The arity of the final output row.
+  */
+class DataStreamIncrementalAggregateWindowFunction[W <: Window](
+ private val aggregates: Array[AggregateFunction[_]],
+ private val aggFields: Array[Int],
+ private val forwardedFieldCount: Int)
+  extends RichWindowFunction[Row, Row, Tuple, W] {
+
+private var output: Row = _
+private var accumulators: Row= _
+
+  override def open(parameters: Configuration): Unit = {
+ output = new Row(forwardedFieldCount + aggregates.length)
+ accumulators = new Row(aggregates.length)
+ var i = 0
--- End diff --

We can do that in `open()` because it is just called once. However, in the 
hot path we should use `while` loops because Scala `for` loops have significant 
overhead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...

2017-03-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106546899
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -191,3 +287,31 @@ class DataStreamOverAggregate(
 
 }
 
+object DataStreamProcTimeCase {
+  class ProcTimeTimestampExtractor
+  extends AssignerWithPunctuatedWatermarks[Row] {
+
+override def checkAndGetNextWatermark(
+  lastElement: Row,
+  extractedTimestamp: Long): Watermark = {
+  null
+}
+
+override def extractTimestamp(
+  element: Row,
+  previousElementTimestamp: Long): Long = {
+  System.currentTimeMillis()
--- End diff --

`System.currentTimeMillis()` is not not strictly increasing. For instance 
the time of a machine can be synced by a timeserver. So we should always 
remember the last emitted timestamp and return the `max` of the last and the 
current time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...

2017-03-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106545668
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -119,6 +154,64 @@ class DataStreamOverAggregate(
 
   }
 
+  def createTimeBoundedProcessingTimeOverWindow(
+inputDS: DataStream[Row]): DataStream[Row] = {
+
+val overWindow: Group = logicWindow.groups.get(0)
+val partitionKeys: Array[Int] = overWindow.keys.toArray
+val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = 
generateNamedAggregates
+
+// final long time_boundary =
+// 
Long.parseLong(windowReference.getConstants().get(1).getValue().toString());
+val index = 
overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef].getIndex
+val count = input.getRowType().getFieldCount()
+val lowerboundIndex = index - count
+val time_boundary =  logicWindow.constants.get(lowerboundIndex)
+ .getValue2.asInstanceOf[java.math.BigDecimal].longValue()
+
+
+val (aggFields, aggregates) =  
AggregateUtil.transformToAggregateFunctions(
+ namedAggregates.map(_.getKey),inputType, needRetraction = false)
+ 
+ 
+// As we it is not possible to operate neither on sliding count neither
+// on sliding time we need to manage the eviction of the events that
+// expire ourselves based on the proctime (system time). Therefore the
+// current system time is assign as the timestamp of the event to be
+// recognize by the evictor
+
+val inputDataStreamTimed = inputDS
--- End diff --

We should not assign processing timestamps as event timestamps because it 
overrides existing timestamps.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...

2017-03-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106544854
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -106,6 +137,10 @@ class DataStreamOverAggregate(
 if (overWindow.lowerBound.isUnbounded &&
   overWindow.upperBound.isCurrentRow) {
   createUnboundedAndCurrentRowProcessingTimeOverWindow(inputDS)
+} else if 
(overWindow.lowerBound.getOffset.getType.isInstanceOf[IntervalSqlType]
--- End diff --

We can explicitly check if a window is a ROW or a RANGE window by 
`overWindow.isRows`. I'm also not sure about the 
`overWindow.upperBound.isPreceding` condition.

I think the condition rather be like this:
```
else if (overWindow.lowerBound.isPreceding() && 
!overWindow.lowerBound.isUnbounded() && // bounded preceding
  overWindow.upperBound.isCurrentRow() && // until current row
  !overWindow.isRows) // is RANGE window
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...

2017-03-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106630792
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataStreamProcTimeAggregateWindowFunction.scala
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.types.Row
+import org.apache.flink.configuration.Configuration
+import 
org.apache.flink.streaming.api.functions.windowing.RichWindowFunction
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+import org.apache.flink.table.functions.AggregateFunction
+import org.apache.flink.table.functions.Accumulator
+
+/**
+  * Computes the final aggregate value from incrementally computed 
aggreagtes.
+  *
+  * @param aggregates The aggregates to be computed
+  * @param aggFields the fields on which to apply the aggregate.
+  * @param forwardedFieldCount The fields to be carried from current row.
+  */
+class DataStreamProcTimeAggregateWindowFunction[W <: Window](
+ private val aggregates: Array[AggregateFunction[_]],
+ private val aggFields: Array[Int],
+ private val forwardedFieldCount: Int)
+  extends RichWindowFunction[Row, Row, Tuple, W] {
+
+private var output: Row = _
+private var accumulators: Row= _
+
+  override def open(parameters: Configuration): Unit = {
+ output = new Row(forwardedFieldCount + aggregates.length)
+ accumulators = new Row(aggregates.length)
+ var i = 0
+ while (i < aggregates.length) {
+accumulators.setField(i, aggregates(i).createAccumulator())
+i = i + 1
+ }
+  }
+  
+ 
+  /**
+* Calculate aggregated values output by aggregate buffer, and set them 
into output
+* Row based on the mapping relation between intermediate aggregate 
data and output data.
+*/
+  override def apply(
+  key: Tuple,
+  window: W,
+  records: Iterable[Row],
+  out: Collector[Row]): Unit = {
+
+   var i = 0
+ //initialize the values of the aggregators by re-creating them
+ //the design of the Accumulator interface should be extended to 
enable 
+ //a reset function for better performance
+ while (i < aggregates.length) {
+   
aggregates(i).resetAccumulator(accumulators.getField(i).asInstanceOf[Accumulator])
+   i += 1
+ }
+ var reuse:Row = null
--- End diff --

+space


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...

2017-03-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106546112
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -119,6 +154,64 @@ class DataStreamOverAggregate(
 
   }
 
+  def createTimeBoundedProcessingTimeOverWindow(
+inputDS: DataStream[Row]): DataStream[Row] = {
+
+val overWindow: Group = logicWindow.groups.get(0)
+val partitionKeys: Array[Int] = overWindow.keys.toArray
+val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = 
generateNamedAggregates
+
+// final long time_boundary =
+// 
Long.parseLong(windowReference.getConstants().get(1).getValue().toString());
+val index = 
overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef].getIndex
+val count = input.getRowType().getFieldCount()
+val lowerboundIndex = index - count
+val time_boundary =  logicWindow.constants.get(lowerboundIndex)
+ .getValue2.asInstanceOf[java.math.BigDecimal].longValue()
+
+
+val (aggFields, aggregates) =  
AggregateUtil.transformToAggregateFunctions(
+ namedAggregates.map(_.getKey),inputType, needRetraction = false)
+ 
+ 
+// As we it is not possible to operate neither on sliding count neither
+// on sliding time we need to manage the eviction of the events that
+// expire ourselves based on the proctime (system time). Therefore the
+// current system time is assign as the timestamp of the event to be
+// recognize by the evictor
+
+val inputDataStreamTimed = inputDS
+  .assignTimestampsAndWatermarks(new ProcTimeTimestampExtractor())
+
+// get the output types
+val rowTypeInfo = 
FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
+
+val result: DataStream[Row] =
+  if (partitionKeys.nonEmpty) {
+inputDataStreamTimed.keyBy(partitionKeys:_*)
+  .window(GlobalWindows.create())
+  .trigger(CountTrigger.of(1))
+  .evictor(TimeEvictor.of(Time.milliseconds(time_boundary))) 
+  .apply(new 
DataStreamIncrementalAggregateWindowFunction[GlobalWindow]
--- End diff --

Let's use a process function. We have to change the code anyway once we 
want to support `FOLLOWING`. Also a `ProcessFunction` does not need to 
aggregate all rows for each row but remember the accumulators of the last row, 
add the new row and retract the old ones.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...

2017-03-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106630541
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -785,7 +785,7 @@ object AggregateUtil {
 (propPos._1, propPos._2)
   }
 
-  private def transformToAggregateFunctions(
--- End diff --

We could also move the logic to create the processing function into 
`AggreagteUtil` like all other operators that have to deal with aggregations.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...

2017-03-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106629913
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -119,6 +154,66 @@ class DataStreamOverAggregate(
 
   }
 
+  def createTimeBoundedProcessingTimeOverWindow(
+inputDS: DataStream[Row]): DataStream[Row] = {
+
+val overWindow: Group = logicWindow.groups.get(0)
+val partitionKeys: Array[Int] = overWindow.keys.toArray
+val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = 
generateNamedAggregates
+
+val index = 
overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef].getIndex
+val count = input.getRowType().getFieldCount()
+val lowerboundIndex = index - count
+var time_boundary = 0L
+ 
+logicWindow.constants.get(lowerboundIndex).getValue2 match {
+  case _: java.math.BigDecimal => time_boundary = 
logicWindow.constants.get(lowerboundIndex)
+ .getValue2.asInstanceOf[java.math.BigDecimal].longValue()
+  case _ => throw new TableException("OVER Window boundaries must be 
numeric")
+}
+
+val (aggFields, aggregates) =  
AggregateUtil.transformToAggregateFunctions(
+ namedAggregates.map(_.getKey),inputType, needRetraction = false)
+ 
+ 
+// As we it is not possible to operate neither on sliding count neither
+// on sliding time we need to manage the eviction of the events that
+// expire ourselves based on the proctime (system time). Therefore the
+// current system time is assign as the timestamp of the event to be
+// recognize by the evictor
+
+val inputDataStreamTimed = inputDS
+  .assignTimestampsAndWatermarks(new ProcTimeTimestampExtractor())
+
+// get the output types
+val rowTypeInfo = 
FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
+
+val result: DataStream[Row] =
+  if (partitionKeys.nonEmpty) {
+inputDataStreamTimed.keyBy(partitionKeys:_*)
+  .window(GlobalWindows.create())
+  .trigger(CountTrigger.of(1))
+  .evictor(TimeEvictor.of(Time.milliseconds(time_boundary))) 
+  .apply(new 
DataStreamProcTimeAggregateWindowFunction[GlobalWindow]
+   (aggregates,aggFields,inputType.getFieldCount))
+  .returns(rowTypeInfo)
+  .name(aggOpName)
+  .asInstanceOf[DataStream[Row]]
+
+  } else {
+  
inputDataStreamTimed.windowAll(GlobalWindows.create()).trigger(CountTrigger.of(1))
+.evictor(TimeEvictor.of(Time.milliseconds(time_boundary))) 
   
+.apply(new 
DataStreamProcTimeAggregateGlobalWindowFunction[GlobalWindow](
--- End diff --

I think a `ProcessFunction` would be more efficient for this use case. With 
a `WindowFunction` and a `GlobalWindow`, we have to aggregate all rows for each 
emitted row. With a `ProcessFunction` we can just add the new row and retract 
those rows which are not included anymore.

Moreover, we have to change the logic to a `ProcessFunction` anyway when we 
extend this to support `FOLLOWING` 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...

2017-03-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106546529
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -191,3 +287,31 @@ class DataStreamOverAggregate(
 
 }
 
+object DataStreamProcTimeCase {
+  class ProcTimeTimestampExtractor
--- End diff --

We should not use a `TimestampExtractor` to assign processing time 
timestamps as event timestamps.
I think the `ProcTimeTimestampExtractor` does not need to be wrapped in an 
`object`. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...

2017-03-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106547382
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataStreamProcTimeAggregateGlobalWindowFunction.scala
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggregate
+
+import 
org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.functions.Accumulator
+
+import java.lang.Iterable
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+
+
+ //org.apache.flink.streaming.api.functions.windowing.AllWindowFunction
+
+class DataStreamProcTimeAggregateGlobalWindowFunction[W <: Window](
+ private val aggregates: Array[AggregateFunction[_]],
+ private val aggFields: Array[Int],
+ private val forwardedFieldCount: Int)
+ extends RichAllWindowFunction[Row, Row, W] {
+  
+private var output: Row = _
+private var accumulators: Row= _
+ 
+
+ override def open(parameters: Configuration): Unit = {
+ output = new Row(forwardedFieldCount + aggregates.length)
+ accumulators = new Row(aggregates.length)
+ var i = 0
+ while (i < aggregates.length) {
+accumulators.setField(i, aggregates(i).createAccumulator())
+i = i + 1
+ }
+  }
+   
+ override def apply(
+  window: W,
+  records: Iterable[Row],
+  out: Collector[Row]): Unit = {
+
+  
+ var i = 0
+ //initialize the values of the aggregators by re-creating them
+ //the design of the Accumulator interface should be extended to 
enable 
+ //a reset function for better performance
+ while (i < aggregates.length) {
+accumulators.setField(i, aggregates(i).createAccumulator())
--- End diff --

Actually, accumulators can be created once in `open()` and be reused after 
the have been reset with `AggregateFunction.resetAccumulator()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...

2017-03-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106631183
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -17,23 +17,54 @@
  */
 package org.apache.flink.table.plan.nodes.datastream
 
-import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import java.util.{ List => JList }
--- End diff --

Can you revert the `import` changes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...

2017-03-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106547467
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataStreamProcTimeAggregateGlobalWindowFunction.scala
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggregate
+
+import 
org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.functions.Accumulator
+
+import java.lang.Iterable
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+
+
+ //org.apache.flink.streaming.api.functions.windowing.AllWindowFunction
+
+class DataStreamProcTimeAggregateGlobalWindowFunction[W <: Window](
+ private val aggregates: Array[AggregateFunction[_]],
+ private val aggFields: Array[Int],
+ private val forwardedFieldCount: Int)
+ extends RichAllWindowFunction[Row, Row, W] {
+  
+private var output: Row = _
+private var accumulators: Row= _
+ 
+
+ override def open(parameters: Configuration): Unit = {
+ output = new Row(forwardedFieldCount + aggregates.length)
+ accumulators = new Row(aggregates.length)
+ var i = 0
+ while (i < aggregates.length) {
+accumulators.setField(i, aggregates(i).createAccumulator())
+i = i + 1
+ }
+  }
+   
+ override def apply(
+  window: W,
+  records: Iterable[Row],
+  out: Collector[Row]): Unit = {
+
+  
+ var i = 0
+ //initialize the values of the aggregators by re-creating them
+ //the design of the Accumulator interface should be extended to 
enable 
+ //a reset function for better performance
+ while (i < aggregates.length) {
+accumulators.setField(i, aggregates(i).createAccumulator())
+i += 1
+ }
+ var reuse:Row = null
--- End diff --

`var reuse: Row` +space


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3550: [FLINK-5654] - Add processing time OVER RANGE BETW...

2017-03-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106630898
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/DataStreamProcTimeBoundAggIntegrationITCase.scala
 ---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.{ TableEnvironment, TableException }
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.utils.{
+  StreamingWithStateTestBase,
+  StreamITCase,
+  StreamTestData
+}
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit._
+import scala.collection.mutable
+
+class DataStreamProcTimeBoundAggIntegrationITCase extends 
StreamingWithStateTestBase {
--- End diff --

Please see the comment on PR #3547 regarding the tests.
Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5883) Re-adding the Exception-thrown code for ListKeyGroupedIterator when the iterator is requested the second time

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929811#comment-15929811
 ] 

ASF GitHub Bot commented on FLINK-5883:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3392
  
+1
Merging this...


> Re-adding the Exception-thrown code for ListKeyGroupedIterator when the 
> iterator is requested the second time
> -
>
> Key: FLINK-5883
> URL: https://issues.apache.org/jira/browse/FLINK-5883
> Project: Flink
>  Issue Type: Improvement
>  Components: DataSet API
>Reporter: lincoln.lee
>Assignee: lincoln.lee
>
> Originally, ListKeyGroupedIterator ensured that a TraversableOnceException 
> was thrown when the iterator is requested the second time within FLINK-1023, 
> it was lost from FLINK-1110 unexpectedly, so add it back. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929795#comment-15929795
 ] 

ASF GitHub Bot commented on FLINK-5654:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106630745
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataStreamProcTimeAggregateWindowFunction.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.types.Row
+import org.apache.flink.configuration.Configuration
+import 
org.apache.flink.streaming.api.functions.windowing.RichWindowFunction
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+import org.apache.flink.table.functions.AggregateFunction
+import org.apache.flink.table.functions.Accumulator
+
+/**
+  * Computes the final aggregate value from incrementally computed 
aggreagtes.
+  *
+  * @param numGroupingKey The number of grouping keys.
+  * @param numAggregates The number of aggregates.
+  * @param finalRowArity The arity of the final output row.
+  */
+class DataStreamIncrementalAggregateWindowFunction[W <: Window](
+ private val aggregates: Array[AggregateFunction[_]],
+ private val aggFields: Array[Int],
+ private val forwardedFieldCount: Int)
+  extends RichWindowFunction[Row, Row, Tuple, W] {
+
+private var output: Row = _
+private var accumulators: Row= _
+
+  override def open(parameters: Configuration): Unit = {
+ output = new Row(forwardedFieldCount + aggregates.length)
+ accumulators = new Row(aggregates.length)
+ var i = 0
--- End diff --

We can do that in `open()` because it is just called once. However, in the 
hot path we should use `while` loops because Scala `for` loops have significant 
overhead.


> Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
> -
>
> Key: FLINK-5654
> URL: https://issues.apache.org/jira/browse/FLINK-5654
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: radu
>
> The goal of this issue is to add support for OVER RANGE aggregations on 
> processing time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have procTime() as parameter. procTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5657)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929800#comment-15929800
 ] 

ASF GitHub Bot commented on FLINK-5654:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106630792
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataStreamProcTimeAggregateWindowFunction.scala
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.types.Row
+import org.apache.flink.configuration.Configuration
+import 
org.apache.flink.streaming.api.functions.windowing.RichWindowFunction
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+import org.apache.flink.table.functions.AggregateFunction
+import org.apache.flink.table.functions.Accumulator
+
+/**
+  * Computes the final aggregate value from incrementally computed 
aggreagtes.
+  *
+  * @param aggregates The aggregates to be computed
+  * @param aggFields the fields on which to apply the aggregate.
+  * @param forwardedFieldCount The fields to be carried from current row.
+  */
+class DataStreamProcTimeAggregateWindowFunction[W <: Window](
+ private val aggregates: Array[AggregateFunction[_]],
+ private val aggFields: Array[Int],
+ private val forwardedFieldCount: Int)
+  extends RichWindowFunction[Row, Row, Tuple, W] {
+
+private var output: Row = _
+private var accumulators: Row= _
+
+  override def open(parameters: Configuration): Unit = {
+ output = new Row(forwardedFieldCount + aggregates.length)
+ accumulators = new Row(aggregates.length)
+ var i = 0
+ while (i < aggregates.length) {
+accumulators.setField(i, aggregates(i).createAccumulator())
+i = i + 1
+ }
+  }
+  
+ 
+  /**
+* Calculate aggregated values output by aggregate buffer, and set them 
into output
+* Row based on the mapping relation between intermediate aggregate 
data and output data.
+*/
+  override def apply(
+  key: Tuple,
+  window: W,
+  records: Iterable[Row],
+  out: Collector[Row]): Unit = {
+
+   var i = 0
+ //initialize the values of the aggregators by re-creating them
+ //the design of the Accumulator interface should be extended to 
enable 
+ //a reset function for better performance
+ while (i < aggregates.length) {
+   
aggregates(i).resetAccumulator(accumulators.getField(i).asInstanceOf[Accumulator])
+   i += 1
+ }
+ var reuse:Row = null
--- End diff --

+space


> Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
> -
>
> Key: FLINK-5654
> URL: https://issues.apache.org/jira/browse/FLINK-5654
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: radu
>
> The goal of this issue is to add support for OVER RANGE aggregations on 
> processing time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> 

[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929806#comment-15929806
 ] 

ASF GitHub Bot commented on FLINK-5654:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106547467
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataStreamProcTimeAggregateGlobalWindowFunction.scala
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggregate
+
+import 
org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.functions.Accumulator
+
+import java.lang.Iterable
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+
+
+ //org.apache.flink.streaming.api.functions.windowing.AllWindowFunction
+
+class DataStreamProcTimeAggregateGlobalWindowFunction[W <: Window](
+ private val aggregates: Array[AggregateFunction[_]],
+ private val aggFields: Array[Int],
+ private val forwardedFieldCount: Int)
+ extends RichAllWindowFunction[Row, Row, W] {
+  
+private var output: Row = _
+private var accumulators: Row= _
+ 
+
+ override def open(parameters: Configuration): Unit = {
+ output = new Row(forwardedFieldCount + aggregates.length)
+ accumulators = new Row(aggregates.length)
+ var i = 0
+ while (i < aggregates.length) {
+accumulators.setField(i, aggregates(i).createAccumulator())
+i = i + 1
+ }
+  }
+   
+ override def apply(
+  window: W,
+  records: Iterable[Row],
+  out: Collector[Row]): Unit = {
+
+  
+ var i = 0
+ //initialize the values of the aggregators by re-creating them
+ //the design of the Accumulator interface should be extended to 
enable 
+ //a reset function for better performance
+ while (i < aggregates.length) {
+accumulators.setField(i, aggregates(i).createAccumulator())
+i += 1
+ }
+ var reuse:Row = null
--- End diff --

`var reuse: Row` +space


> Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
> -
>
> Key: FLINK-5654
> URL: https://issues.apache.org/jira/browse/FLINK-5654
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: radu
>
> The goal of this issue is to add support for OVER RANGE aggregations on 
> processing time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have procTime() as parameter. procTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5657)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this 

[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929804#comment-15929804
 ] 

ASF GitHub Bot commented on FLINK-5654:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106544854
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -106,6 +137,10 @@ class DataStreamOverAggregate(
 if (overWindow.lowerBound.isUnbounded &&
   overWindow.upperBound.isCurrentRow) {
   createUnboundedAndCurrentRowProcessingTimeOverWindow(inputDS)
+} else if 
(overWindow.lowerBound.getOffset.getType.isInstanceOf[IntervalSqlType]
--- End diff --

We can explicitly check if a window is a ROW or a RANGE window by 
`overWindow.isRows`. I'm also not sure about the 
`overWindow.upperBound.isPreceding` condition.

I think the condition rather be like this:
```
else if (overWindow.lowerBound.isPreceding() && 
!overWindow.lowerBound.isUnbounded() && // bounded preceding
  overWindow.upperBound.isCurrentRow() && // until current row
  !overWindow.isRows) // is RANGE window
```


> Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
> -
>
> Key: FLINK-5654
> URL: https://issues.apache.org/jira/browse/FLINK-5654
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: radu
>
> The goal of this issue is to add support for OVER RANGE aggregations on 
> processing time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have procTime() as parameter. procTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5657)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929791#comment-15929791
 ] 

ASF GitHub Bot commented on FLINK-5654:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106547382
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataStreamProcTimeAggregateGlobalWindowFunction.scala
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.aggregate
+
+import 
org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.functions.Accumulator
+
+import java.lang.Iterable
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.functions.AggregateFunction
+
+
+
+ //org.apache.flink.streaming.api.functions.windowing.AllWindowFunction
+
+class DataStreamProcTimeAggregateGlobalWindowFunction[W <: Window](
+ private val aggregates: Array[AggregateFunction[_]],
+ private val aggFields: Array[Int],
+ private val forwardedFieldCount: Int)
+ extends RichAllWindowFunction[Row, Row, W] {
+  
+private var output: Row = _
+private var accumulators: Row= _
+ 
+
+ override def open(parameters: Configuration): Unit = {
+ output = new Row(forwardedFieldCount + aggregates.length)
+ accumulators = new Row(aggregates.length)
+ var i = 0
+ while (i < aggregates.length) {
+accumulators.setField(i, aggregates(i).createAccumulator())
+i = i + 1
+ }
+  }
+   
+ override def apply(
+  window: W,
+  records: Iterable[Row],
+  out: Collector[Row]): Unit = {
+
+  
+ var i = 0
+ //initialize the values of the aggregators by re-creating them
+ //the design of the Accumulator interface should be extended to 
enable 
+ //a reset function for better performance
+ while (i < aggregates.length) {
+accumulators.setField(i, aggregates(i).createAccumulator())
--- End diff --

Actually, accumulators can be created once in `open()` and be reused after 
the have been reset with `AggregateFunction.resetAccumulator()`


> Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
> -
>
> Key: FLINK-5654
> URL: https://issues.apache.org/jira/browse/FLINK-5654
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: radu
>
> The goal of this issue is to add support for OVER RANGE aggregations on 
> processing time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have procTime() as parameter. procTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5657)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to 

[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929803#comment-15929803
 ] 

ASF GitHub Bot commented on FLINK-5654:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106629136
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -119,6 +154,66 @@ class DataStreamOverAggregate(
 
   }
 
+  def createTimeBoundedProcessingTimeOverWindow(
+inputDS: DataStream[Row]): DataStream[Row] = {
+
+val overWindow: Group = logicWindow.groups.get(0)
+val partitionKeys: Array[Int] = overWindow.keys.toArray
+val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = 
generateNamedAggregates
+
+val index = 
overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef].getIndex
+val count = input.getRowType().getFieldCount()
+val lowerboundIndex = index - count
+var time_boundary = 0L
+ 
+logicWindow.constants.get(lowerboundIndex).getValue2 match {
--- End diff --

This can be changed to
```
val timeBoundary = logicWindow.constants.get(lowerboundIndex).getValue2 
match {
  case _: java.math.BigDecimal => logicWindow.constants.get(lowerboundIndex)
.getValue2.asInstanceOf[java.math.BigDecimal].longValue()
  case _ => throw new TableException("OVER Window boundaries must be 
numeric")
}
```


> Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
> -
>
> Key: FLINK-5654
> URL: https://issues.apache.org/jira/browse/FLINK-5654
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: radu
>
> The goal of this issue is to add support for OVER RANGE aggregations on 
> processing time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have procTime() as parameter. procTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5657)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929799#comment-15929799
 ] 

ASF GitHub Bot commented on FLINK-5654:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106548270
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/DataStreamProcTimeBoundAggIntegrationTest.scala
 ---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.{ TableEnvironment, TableException }
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.utils.{
+  StreamingWithStateTestBase,
+  StreamITCase,
+  StreamTestData
+}
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit._
+import scala.collection.mutable
+
+class DataStreamProcTimeBoundAggIntegrationTest extends 
StreamingWithStateTestBase {
--- End diff --

Please see the comments on PR #3547 regarding the tests.


> Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
> -
>
> Key: FLINK-5654
> URL: https://issues.apache.org/jira/browse/FLINK-5654
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: radu
>
> The goal of this issue is to add support for OVER RANGE aggregations on 
> processing time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have procTime() as parameter. procTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5657)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929801#comment-15929801
 ] 

ASF GitHub Bot commented on FLINK-5654:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106631183
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -17,23 +17,54 @@
  */
 package org.apache.flink.table.plan.nodes.datastream
 
-import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import java.util.{ List => JList }
--- End diff --

Can you revert the `import` changes?


> Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
> -
>
> Key: FLINK-5654
> URL: https://issues.apache.org/jira/browse/FLINK-5654
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: radu
>
> The goal of this issue is to add support for OVER RANGE aggregations on 
> processing time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have procTime() as parameter. procTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5657)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929792#comment-15929792
 ] 

ASF GitHub Bot commented on FLINK-5654:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106545668
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -119,6 +154,64 @@ class DataStreamOverAggregate(
 
   }
 
+  def createTimeBoundedProcessingTimeOverWindow(
+inputDS: DataStream[Row]): DataStream[Row] = {
+
+val overWindow: Group = logicWindow.groups.get(0)
+val partitionKeys: Array[Int] = overWindow.keys.toArray
+val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = 
generateNamedAggregates
+
+// final long time_boundary =
+// 
Long.parseLong(windowReference.getConstants().get(1).getValue().toString());
+val index = 
overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef].getIndex
+val count = input.getRowType().getFieldCount()
+val lowerboundIndex = index - count
+val time_boundary =  logicWindow.constants.get(lowerboundIndex)
+ .getValue2.asInstanceOf[java.math.BigDecimal].longValue()
+
+
+val (aggFields, aggregates) =  
AggregateUtil.transformToAggregateFunctions(
+ namedAggregates.map(_.getKey),inputType, needRetraction = false)
+ 
+ 
+// As we it is not possible to operate neither on sliding count neither
+// on sliding time we need to manage the eviction of the events that
+// expire ourselves based on the proctime (system time). Therefore the
+// current system time is assign as the timestamp of the event to be
+// recognize by the evictor
+
+val inputDataStreamTimed = inputDS
--- End diff --

We should not assign processing timestamps as event timestamps because it 
overrides existing timestamps.


> Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
> -
>
> Key: FLINK-5654
> URL: https://issues.apache.org/jira/browse/FLINK-5654
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: radu
>
> The goal of this issue is to add support for OVER RANGE aggregations on 
> processing time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have procTime() as parameter. procTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5657)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929802#comment-15929802
 ] 

ASF GitHub Bot commented on FLINK-5654:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106545012
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -119,6 +154,64 @@ class DataStreamOverAggregate(
 
   }
 
+  def createTimeBoundedProcessingTimeOverWindow(
+inputDS: DataStream[Row]): DataStream[Row] = {
+
+val overWindow: Group = logicWindow.groups.get(0)
+val partitionKeys: Array[Int] = overWindow.keys.toArray
+val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = 
generateNamedAggregates
+
+// final long time_boundary =
+// 
Long.parseLong(windowReference.getConstants().get(1).getValue().toString());
+val index = 
overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef].getIndex
+val count = input.getRowType().getFieldCount()
+val lowerboundIndex = index - count
+val time_boundary =  logicWindow.constants.get(lowerboundIndex)
--- End diff --

Check this


> Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
> -
>
> Key: FLINK-5654
> URL: https://issues.apache.org/jira/browse/FLINK-5654
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: radu
>
> The goal of this issue is to add support for OVER RANGE aggregations on 
> processing time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have procTime() as parameter. procTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5657)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929794#comment-15929794
 ] 

ASF GitHub Bot commented on FLINK-5654:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106630898
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/DataStreamProcTimeBoundAggIntegrationITCase.scala
 ---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.{ TableEnvironment, TableException }
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.utils.{
+  StreamingWithStateTestBase,
+  StreamITCase,
+  StreamTestData
+}
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit._
+import scala.collection.mutable
+
+class DataStreamProcTimeBoundAggIntegrationITCase extends 
StreamingWithStateTestBase {
--- End diff --

Please see the comment on PR #3547 regarding the tests.
Thanks


> Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
> -
>
> Key: FLINK-5654
> URL: https://issues.apache.org/jira/browse/FLINK-5654
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: radu
>
> The goal of this issue is to add support for OVER RANGE aggregations on 
> processing time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have procTime() as parameter. procTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5657)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929805#comment-15929805
 ] 

ASF GitHub Bot commented on FLINK-5654:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106546112
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -119,6 +154,64 @@ class DataStreamOverAggregate(
 
   }
 
+  def createTimeBoundedProcessingTimeOverWindow(
+inputDS: DataStream[Row]): DataStream[Row] = {
+
+val overWindow: Group = logicWindow.groups.get(0)
+val partitionKeys: Array[Int] = overWindow.keys.toArray
+val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = 
generateNamedAggregates
+
+// final long time_boundary =
+// 
Long.parseLong(windowReference.getConstants().get(1).getValue().toString());
+val index = 
overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef].getIndex
+val count = input.getRowType().getFieldCount()
+val lowerboundIndex = index - count
+val time_boundary =  logicWindow.constants.get(lowerboundIndex)
+ .getValue2.asInstanceOf[java.math.BigDecimal].longValue()
+
+
+val (aggFields, aggregates) =  
AggregateUtil.transformToAggregateFunctions(
+ namedAggregates.map(_.getKey),inputType, needRetraction = false)
+ 
+ 
+// As we it is not possible to operate neither on sliding count neither
+// on sliding time we need to manage the eviction of the events that
+// expire ourselves based on the proctime (system time). Therefore the
+// current system time is assign as the timestamp of the event to be
+// recognize by the evictor
+
+val inputDataStreamTimed = inputDS
+  .assignTimestampsAndWatermarks(new ProcTimeTimestampExtractor())
+
+// get the output types
+val rowTypeInfo = 
FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
+
+val result: DataStream[Row] =
+  if (partitionKeys.nonEmpty) {
+inputDataStreamTimed.keyBy(partitionKeys:_*)
+  .window(GlobalWindows.create())
+  .trigger(CountTrigger.of(1))
+  .evictor(TimeEvictor.of(Time.milliseconds(time_boundary))) 
+  .apply(new 
DataStreamIncrementalAggregateWindowFunction[GlobalWindow]
--- End diff --

Let's use a process function. We have to change the code anyway once we 
want to support `FOLLOWING`. Also a `ProcessFunction` does not need to 
aggregate all rows for each row but remember the accumulators of the last row, 
add the new row and retract the old ones.


> Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
> -
>
> Key: FLINK-5654
> URL: https://issues.apache.org/jira/browse/FLINK-5654
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: radu
>
> The goal of this issue is to add support for OVER RANGE aggregations on 
> processing time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have procTime() as parameter. procTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5657)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929793#comment-15929793
 ] 

ASF GitHub Bot commented on FLINK-5654:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106630541
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -785,7 +785,7 @@ object AggregateUtil {
 (propPos._1, propPos._2)
   }
 
-  private def transformToAggregateFunctions(
--- End diff --

We could also move the logic to create the processing function into 
`AggreagteUtil` like all other operators that have to deal with aggregations.


> Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
> -
>
> Key: FLINK-5654
> URL: https://issues.apache.org/jira/browse/FLINK-5654
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: radu
>
> The goal of this issue is to add support for OVER RANGE aggregations on 
> processing time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have procTime() as parameter. procTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5657)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929797#comment-15929797
 ] 

ASF GitHub Bot commented on FLINK-5654:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106629913
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -119,6 +154,66 @@ class DataStreamOverAggregate(
 
   }
 
+  def createTimeBoundedProcessingTimeOverWindow(
+inputDS: DataStream[Row]): DataStream[Row] = {
+
+val overWindow: Group = logicWindow.groups.get(0)
+val partitionKeys: Array[Int] = overWindow.keys.toArray
+val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = 
generateNamedAggregates
+
+val index = 
overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef].getIndex
+val count = input.getRowType().getFieldCount()
+val lowerboundIndex = index - count
+var time_boundary = 0L
+ 
+logicWindow.constants.get(lowerboundIndex).getValue2 match {
+  case _: java.math.BigDecimal => time_boundary = 
logicWindow.constants.get(lowerboundIndex)
+ .getValue2.asInstanceOf[java.math.BigDecimal].longValue()
+  case _ => throw new TableException("OVER Window boundaries must be 
numeric")
+}
+
+val (aggFields, aggregates) =  
AggregateUtil.transformToAggregateFunctions(
+ namedAggregates.map(_.getKey),inputType, needRetraction = false)
+ 
+ 
+// As we it is not possible to operate neither on sliding count neither
+// on sliding time we need to manage the eviction of the events that
+// expire ourselves based on the proctime (system time). Therefore the
+// current system time is assign as the timestamp of the event to be
+// recognize by the evictor
+
+val inputDataStreamTimed = inputDS
+  .assignTimestampsAndWatermarks(new ProcTimeTimestampExtractor())
+
+// get the output types
+val rowTypeInfo = 
FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
+
+val result: DataStream[Row] =
+  if (partitionKeys.nonEmpty) {
+inputDataStreamTimed.keyBy(partitionKeys:_*)
+  .window(GlobalWindows.create())
+  .trigger(CountTrigger.of(1))
+  .evictor(TimeEvictor.of(Time.milliseconds(time_boundary))) 
+  .apply(new 
DataStreamProcTimeAggregateWindowFunction[GlobalWindow]
+   (aggregates,aggFields,inputType.getFieldCount))
+  .returns(rowTypeInfo)
+  .name(aggOpName)
+  .asInstanceOf[DataStream[Row]]
+
+  } else {
+  
inputDataStreamTimed.windowAll(GlobalWindows.create()).trigger(CountTrigger.of(1))
+.evictor(TimeEvictor.of(Time.milliseconds(time_boundary))) 
   
+.apply(new 
DataStreamProcTimeAggregateGlobalWindowFunction[GlobalWindow](
--- End diff --

I think a `ProcessFunction` would be more efficient for this use case. With 
a `WindowFunction` and a `GlobalWindow`, we have to aggregate all rows for each 
emitted row. With a `ProcessFunction` we can just add the new row and retract 
those rows which are not included anymore.

Moreover, we have to change the logic to a `ProcessFunction` anyway when we 
extend this to support `FOLLOWING` 


> Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
> -
>
> Key: FLINK-5654
> URL: https://issues.apache.org/jira/browse/FLINK-5654
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: radu
>
> The goal of this issue is to add support for OVER RANGE aggregations on 
> processing time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have procTime() as parameter. procTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5657)
> - FOLLOWING 

[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929796#comment-15929796
 ] 

ASF GitHub Bot commented on FLINK-5654:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106546899
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -191,3 +287,31 @@ class DataStreamOverAggregate(
 
 }
 
+object DataStreamProcTimeCase {
+  class ProcTimeTimestampExtractor
+  extends AssignerWithPunctuatedWatermarks[Row] {
+
+override def checkAndGetNextWatermark(
+  lastElement: Row,
+  extractedTimestamp: Long): Watermark = {
+  null
+}
+
+override def extractTimestamp(
+  element: Row,
+  previousElementTimestamp: Long): Long = {
+  System.currentTimeMillis()
--- End diff --

`System.currentTimeMillis()` is not not strictly increasing. For instance 
the time of a machine can be synced by a timeserver. So we should always 
remember the last emitted timestamp and return the `max` of the last and the 
current time.


> Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
> -
>
> Key: FLINK-5654
> URL: https://issues.apache.org/jira/browse/FLINK-5654
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: radu
>
> The goal of this issue is to add support for OVER RANGE aggregations on 
> processing time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have procTime() as parameter. procTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5657)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929798#comment-15929798
 ] 

ASF GitHub Bot commented on FLINK-5654:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3550#discussion_r106546529
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -191,3 +287,31 @@ class DataStreamOverAggregate(
 
 }
 
+object DataStreamProcTimeCase {
+  class ProcTimeTimestampExtractor
--- End diff --

We should not use a `TimestampExtractor` to assign processing time 
timestamps as event timestamps.
I think the `ProcTimeTimestampExtractor` does not need to be wrapped in an 
`object`. 



> Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
> -
>
> Key: FLINK-5654
> URL: https://issues.apache.org/jira/browse/FLINK-5654
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: radu
>
> The goal of this issue is to add support for OVER RANGE aggregations on 
> processing time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' 
> HOUR PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have procTime() as parameter. procTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5657)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5995) Get a Exception when creating the ListStateDescriptor with a TypeInformation

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929776#comment-15929776
 ] 

ASF GitHub Bot commented on FLINK-5995:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3503
  
Looks good, merging this...


> Get a Exception when creating the ListStateDescriptor with a TypeInformation 
> -
>
> Key: FLINK-5995
> URL: https://issues.apache.org/jira/browse/FLINK-5995
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> When use OperatorState and creating the ListStateDescriptor with a 
> TypeInformation,I got a exception. The Exception info is:
> {code}
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:915)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:858)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:858)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalStateException: Serializer not yet initialized.
>   at 
> org.apache.flink.api.common.state.StateDescriptor.getSerializer(StateDescriptor.java:169)
>   at 
> org.apache.flink.api.common.state.ListStateDescriptor.getElementSerializer(ListStateDescriptor.java:93)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getOperatorState(DefaultOperatorStateBackend.java:110)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getOperatorState(DefaultOperatorStateBackend.java:91)
>   at 
> org.apache.flink.table.runtime.aggregate.UnboundedNonPartitionedProcessingOverProcessFunction.initializeState(UnboundedNonPartitionedProcessingOverProcessFunction.scala:104)
>   at 
> org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:106)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:242)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:681)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:669)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:251)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:670)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> So, I'll add `stateDescriptor.initializeSerializerUnlessSet()` call in the 
> `getOperatorState` method. I appreciate If anyone can give me some advice?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3503: [FLINK-5995][checkpoints] fix Get a Exception when creati...

2017-03-17 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3503
  
Looks good, merging this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5981) SSL version and ciper suites cannot be constrained as configured

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929775#comment-15929775
 ] 

ASF GitHub Bot commented on FLINK-5981:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3486


> SSL version and ciper suites cannot be constrained as configured
> 
>
> Key: FLINK-5981
> URL: https://issues.apache.org/jira/browse/FLINK-5981
> Project: Flink
>  Issue Type: Bug
>  Components: Security
>Reporter: Tao Wang
>Assignee: Tao Wang
> Fix For: 1.3.0
>
>
> I configured ssl and start flink job, but found configured properties cannot 
> apply properly:
> akka port: only ciper suites apply right, ssl version not
> blob server/netty server: both ssl version and ciper suites are not like what 
> I configured
> I've found out the reason why:
> http://stackoverflow.com/questions/11504173/sslcontext-initialization (for 
> blob server and netty server)
> https://groups.google.com/forum/#!topic/akka-user/JH6bGnWE8kY(for akka ssl 
> version, it's fixed in akka 2.4:https://github.com/akka/akka/pull/21078)
> I'll fix the issue on blob server and netty server, and it seems like only 
> upgrade for akka can solve issue in akka side(we'll consider later as upgrade 
> is not a small action).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (FLINK-5981) SSL version and ciper suites cannot be constrained as configured

2017-03-17 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-5981.
-
   Resolution: Fixed
Fix Version/s: 1.3.0

Fixed via e0614f6551a232706b74963563694486fe2461b1

> SSL version and ciper suites cannot be constrained as configured
> 
>
> Key: FLINK-5981
> URL: https://issues.apache.org/jira/browse/FLINK-5981
> Project: Flink
>  Issue Type: Bug
>  Components: Security
>Reporter: Tao Wang
>Assignee: Tao Wang
> Fix For: 1.3.0
>
>
> I configured ssl and start flink job, but found configured properties cannot 
> apply properly:
> akka port: only ciper suites apply right, ssl version not
> blob server/netty server: both ssl version and ciper suites are not like what 
> I configured
> I've found out the reason why:
> http://stackoverflow.com/questions/11504173/sslcontext-initialization (for 
> blob server and netty server)
> https://groups.google.com/forum/#!topic/akka-user/JH6bGnWE8kY(for akka ssl 
> version, it's fixed in akka 2.4:https://github.com/akka/akka/pull/21078)
> I'll fix the issue on blob server and netty server, and it seems like only 
> upgrade for akka can solve issue in akka side(we'll consider later as upgrade 
> is not a small action).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-5981) SSL version and ciper suites cannot be constrained as configured

2017-03-17 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-5981.
---

> SSL version and ciper suites cannot be constrained as configured
> 
>
> Key: FLINK-5981
> URL: https://issues.apache.org/jira/browse/FLINK-5981
> Project: Flink
>  Issue Type: Bug
>  Components: Security
>Reporter: Tao Wang
>Assignee: Tao Wang
> Fix For: 1.3.0
>
>
> I configured ssl and start flink job, but found configured properties cannot 
> apply properly:
> akka port: only ciper suites apply right, ssl version not
> blob server/netty server: both ssl version and ciper suites are not like what 
> I configured
> I've found out the reason why:
> http://stackoverflow.com/questions/11504173/sslcontext-initialization (for 
> blob server and netty server)
> https://groups.google.com/forum/#!topic/akka-user/JH6bGnWE8kY(for akka ssl 
> version, it's fixed in akka 2.4:https://github.com/akka/akka/pull/21078)
> I'll fix the issue on blob server and netty server, and it seems like only 
> upgrade for akka can solve issue in akka side(we'll consider later as upgrade 
> is not a small action).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3486: [FLINK-5981][SECURITY]make ssl version and cipher ...

2017-03-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3486


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5991) Expose Broadcast Operator State through public APIs

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929768#comment-15929768
 ] 

ASF GitHub Bot commented on FLINK-5991:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/3508
  
@StefanRRichter I think I was faster ...  


> Expose Broadcast Operator State through public APIs
> ---
>
> Key: FLINK-5991
> URL: https://issues.apache.org/jira/browse/FLINK-5991
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, State Backends, Checkpointing
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.3.0
>
>
> The broadcast operator state functionality was added in FLINK-5265, it just 
> hasn't been exposed through any public APIs yet.
> Currently, we have 2 streaming connector features for 1.3 that are pending on 
> broadcast state: rescalable Kinesis / Kafka consumers with shard / partition 
> discovery (FLINK-4821 & FLINK-4022). We should consider exposing broadcast 
> state for the 1.3 release also.
> This JIRA also serves the purpose to discuss how we want to expose it.
> To initiate the discussion, I propose:
> 1. For the more powerful {{CheckpointedFunction}}, add the following to the 
> {{OperatorStateStore}} interface:
> {code}
>  ListState getBroadcastOperatorState(ListStateDescriptor 
> stateDescriptor);
>  ListState 
> getBroadcastSerializableListState(String stateName);
> {code}
> 2. For a simpler {{ListCheckpointed}} variant, we probably should have a 
> separate {{BroadcastListCheckpointed}} interface.
> Extending {{ListCheckpointed}} to let the user define either the list state 
> type of either {{PARTITIONABLE}} or {{BROADCAST}} might also be possible, if 
> we can rely on a contract that the value doesn't change. Or we expose a 
> defining method (e.g. {{getListStateType()}}) that is called only once in the 
> operator. This would break user code, but can be considered because it is 
> marked as {{PublicEvolving}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3508: [FLINK-5991] [state-backend, streaming] Expose Broadcast ...

2017-03-17 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/3508
  
@StefanRRichter I think I was faster ... 😉 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-6102) Update protobuf to latest version

2017-03-17 Thread Su Ralph (JIRA)
Su Ralph created FLINK-6102:
---

 Summary: Update protobuf to latest version
 Key: FLINK-6102
 URL: https://issues.apache.org/jira/browse/FLINK-6102
 Project: Flink
  Issue Type: Task
  Components: Core
Affects Versions: 1.2.0
Reporter: Su Ralph


In flink release 1.2.0, we have protobuf-java as 2.5.0, and it's packaged into 
flink fat jar. 

This would cause conflict when an user application use new version of 
protobuf-java, it make more sense to update to later protobuf-java.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3508: [FLINK-5991] [state-backend, streaming] Expose Broadcast ...

2017-03-17 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3508
  
I wonder if there could also exist a case for broadcasting operator state 
(non-keyed), where only one operator instance is selected as sender and all 
others receive on restore. Furthermore, the union aspect may (or may not) 
happen at restore time, but not at the time that a user requests this state. 
For what this currently does, I think `ReplicatingState` describes it pretty 
well. Broadcast would be a good description from the operator's perspective: it 
broadcasts the generated data to all peers on restore.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5991) Expose Broadcast Operator State through public APIs

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929756#comment-15929756
 ] 

ASF GitHub Bot commented on FLINK-5991:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/3508
  
I wonder if there could also exist a case for broadcasting operator state 
(non-keyed), where only one operator instance is selected as sender and all 
others receive on restore. Furthermore, the union aspect may (or may not) 
happen at restore time, but not at the time that a user requests this state. 
For what this currently does, I think `ReplicatingState` describes it pretty 
well. Broadcast would be a good description from the operator's perspective: it 
broadcasts the generated data to all peers on restore.


> Expose Broadcast Operator State through public APIs
> ---
>
> Key: FLINK-5991
> URL: https://issues.apache.org/jira/browse/FLINK-5991
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, State Backends, Checkpointing
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.3.0
>
>
> The broadcast operator state functionality was added in FLINK-5265, it just 
> hasn't been exposed through any public APIs yet.
> Currently, we have 2 streaming connector features for 1.3 that are pending on 
> broadcast state: rescalable Kinesis / Kafka consumers with shard / partition 
> discovery (FLINK-4821 & FLINK-4022). We should consider exposing broadcast 
> state for the 1.3 release also.
> This JIRA also serves the purpose to discuss how we want to expose it.
> To initiate the discussion, I propose:
> 1. For the more powerful {{CheckpointedFunction}}, add the following to the 
> {{OperatorStateStore}} interface:
> {code}
>  ListState getBroadcastOperatorState(ListStateDescriptor 
> stateDescriptor);
>  ListState 
> getBroadcastSerializableListState(String stateName);
> {code}
> 2. For a simpler {{ListCheckpointed}} variant, we probably should have a 
> separate {{BroadcastListCheckpointed}} interface.
> Extending {{ListCheckpointed}} to let the user define either the list state 
> type of either {{PARTITIONABLE}} or {{BROADCAST}} might also be possible, if 
> we can rely on a contract that the value doesn't change. Or we expose a 
> defining method (e.g. {{getListStateType()}}) that is called only once in the 
> operator. This would break user code, but can be considered because it is 
> marked as {{PublicEvolving}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5653) Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929753#comment-15929753
 ] 

ASF GitHub Bot commented on FLINK-5653:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3547#discussion_r106627390
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -17,34 +17,41 @@
  */
 package org.apache.flink.table.plan.nodes.datastream
 
-import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet }
--- End diff --

No worries. I know that IDEs tend to reformat code but it really makes 
reviews harder.
Thanks!


> Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5653
> URL: https://issues.apache.org/jira/browse/FLINK-5653
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Stefano Bortoli
>
> The goal of this issue is to add support for OVER ROWS aggregations on 
> processing time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 PRECEDING 
> AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 PRECEDING 
> AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have procTime() as parameter. procTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5656)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3547#discussion_r106627390
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -17,34 +17,41 @@
  */
 package org.apache.flink.table.plan.nodes.datastream
 
-import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.plan.{ RelOptCluster, RelTraitSet }
--- End diff --

No worries. I know that IDEs tend to reformat code but it really makes 
reviews harder.
Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5653) Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929751#comment-15929751
 ] 

ASF GitHub Bot commented on FLINK-5653:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3547#discussion_r106627282
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -106,9 +113,14 @@ class DataStreamOverAggregate(
 if (overWindow.lowerBound.isUnbounded &&
   overWindow.upperBound.isCurrentRow) {
   createUnboundedAndCurrentRowProcessingTimeOverWindow(inputDS)
+} // lowerbound is a BasicType and upperbound is PRECEEDING or 
CURRENT ROW
+else if 
(overWindow.lowerBound.getOffset.getType.isInstanceOf[BasicSqlType]
--- End diff --

Yes, I realized that when looking at PR #3550 that 
`isInstanceOf[BasicSqlType]` and `.isInstanceOf[IntervalSqlType]` distinguishes 
ROW from RANGE windows. I think using `.isRows()` is more clear and might also 
be safer because it appears to be a more public API than the type of the offset.


> Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5653
> URL: https://issues.apache.org/jira/browse/FLINK-5653
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Stefano Bortoli
>
> The goal of this issue is to add support for OVER ROWS aggregations on 
> processing time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 PRECEDING 
> AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 PRECEDING 
> AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have procTime() as parameter. procTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5656)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3547: [FLINK-5653] Add processing time OVER ROWS BETWEEN...

2017-03-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3547#discussion_r106627282
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -106,9 +113,14 @@ class DataStreamOverAggregate(
 if (overWindow.lowerBound.isUnbounded &&
   overWindow.upperBound.isCurrentRow) {
   createUnboundedAndCurrentRowProcessingTimeOverWindow(inputDS)
+} // lowerbound is a BasicType and upperbound is PRECEEDING or 
CURRENT ROW
+else if 
(overWindow.lowerBound.getOffset.getType.isInstanceOf[BasicSqlType]
--- End diff --

Yes, I realized that when looking at PR #3550 that 
`isInstanceOf[BasicSqlType]` and `.isInstanceOf[IntervalSqlType]` distinguishes 
ROW from RANGE windows. I think using `.isRows()` is more clear and might also 
be safer because it appears to be a more public API than the type of the offset.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929740#comment-15929740
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r106621564
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -91,6 +91,35 @@ object AggregateUtil {
   }
 
   /**
+* Create an [[ProcessFunction]] to evaluate final aggregate value.
+*
+* @param namedAggregates List of calls to aggregate functions and 
their output field names
+* @param inputType Input row type
+* @return [[UnboundedProcessingOverProcessFunction]]
+*/
+  private[flink] def CreateUnboundedEventTimeOverProcessFunction(
+   namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+   inputType: RelDataType): UnboundedEventTimeOverProcessFunction = {
+
+val (aggFields, aggregates) =
+  transformToAggregateFunctions(
+namedAggregates.map(_.getKey),
+inputType,
+needRetraction = false)
+
+val aggregationStateType: RowTypeInfo =
--- End diff --

Also you can use `createAccumulatorRowType(inputType, aggregates)`.

Btw. could you refactor the `createAccumulatorRowType(inputType, 
aggregates)` method and remove the `inputType` parameter? It is not used. 
Thanks!


> Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5658
> URL: https://issues.apache.org/jira/browse/FLINK-5658
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Yuhong Hong
>
> The goal of this issue is to add support for OVER RANGE aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - bounded PRECEDING is not supported (see FLINK-5655)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929742#comment-15929742
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r106620612
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -159,6 +167,46 @@ class DataStreamOverAggregate(
 result
   }
 
+  def createUnboundedAndCurrentRowEventTimeOverWindow(
+inputDS: DataStream[Row]): DataStream[Row]  = {
+
+val overWindow: Group = logicWindow.groups.get(0)
+val partitionKeys: Array[Int] = overWindow.keys.toArray
+val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = 
generateNamedAggregates
+
+// get the output types
+val rowTypeInfo = 
FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
+
+val result: DataStream[Row] =
+  // partitioned aggregation
+  if (partitionKeys.nonEmpty) {
+val keyedStream = inputDS.keyBy(partitionKeys: _*)
+val processFunction = 
AggregateUtil.CreateUnboundedEventTimeOverProcessFunction(
+  namedAggregates,
+  inputType)
+
+keyedStream
+  .process(processFunction)
+  .returns(rowTypeInfo)
+  .name(aggOpName)
+  .asInstanceOf[DataStream[Row]]
+  }
+  // global non-partitioned aggregation
+  else {
+val processFunction = 
AggregateUtil.CreateUnboundedEventTimeOverProcessFunction(
+  namedAggregates,
+  inputType)
+
+inputDS.keyBy(new NullByteKeySelector[Row])
+  .process(processFunction)
+  .setParallelism(1)
--- End diff --

also `setMaxParallelism(1)` to prevent that this operator can be scaled out.


> Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5658
> URL: https://issues.apache.org/jira/browse/FLINK-5658
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Yuhong Hong
>
> The goal of this issue is to add support for OVER RANGE aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - bounded PRECEDING is not supported (see FLINK-5655)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929738#comment-15929738
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r106622479
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
 ---
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.types.Row
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.tuple.Tuple2
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+
+/**
+  * A ProcessFunction to support unbounded event-time over-window
+  *
+  * @param aggregates the aggregate functions
+  * @param aggFields  the filed index which the aggregate functions use
+  * @param forwardedFieldCount the input fields count
+  * @param intermediateType the intermediate row tye which the state saved
+  * @param inputType the input row tye which the state saved
+  *
+  */
+class UnboundedEventTimeOverProcessFunction(
+private val aggregates: Array[AggregateFunction[_]],
+private val aggFields: Array[Int],
+private val forwardedFieldCount: Int,
+private val intermediateType: TypeInformation[Row],
+private val inputType: TypeInformation[Row])
+  extends ProcessFunction[Row, Row]{
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+
+  private var output: Row = _
+  private var accumulatorState: ValueState[Row] = _
+  private var rowState: ListState[Tuple2[Long, Row]] = _
+
+
+  override def open(config: Configuration) {
+output = new Row(forwardedFieldCount + aggregates.length)
+val valueSerializer: TypeSerializer[Row] =
+  
intermediateType.createSerializer(getRuntimeContext.getExecutionConfig)
+val stateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer)
+accumulatorState = getRuntimeContext.getState[Row](stateDescriptor)
+
+val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] =
+  (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, 
inputType)).createSerializer(
+
getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, 
Row]]]
+val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] =
+  new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", 
tupleSerializer)
+rowState = getRuntimeContext.getListState[Tuple2[Long, 
Row]](tupleStateDescriptor)
+
+  }
+
+  /**
+* Process one element from the input stream, not emit the output
+*
+* @param value The input value.
+* @param ctx   The ctx to register timer or get current time
+* @param out   The collector for returning result values.
+*
+*/
+  override def processElement(
+ input: Row,
+ ctx:  ProcessFunction[Row, Row]#Context,
+ out: Collector[Row]): Unit = {
+
+// discard later record
+if (ctx.timestamp() >= ctx.timerService().currentWatermark()) {
+  // ensure every key just register on timer
+  

[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929741#comment-15929741
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r106623704
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
 ---
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.types.Row
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.tuple.Tuple2
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+
+/**
+  * A ProcessFunction to support unbounded event-time over-window
+  *
+  * @param aggregates the aggregate functions
+  * @param aggFields  the filed index which the aggregate functions use
+  * @param forwardedFieldCount the input fields count
+  * @param intermediateType the intermediate row tye which the state saved
+  * @param inputType the input row tye which the state saved
+  *
+  */
+class UnboundedEventTimeOverProcessFunction(
+private val aggregates: Array[AggregateFunction[_]],
+private val aggFields: Array[Int],
+private val forwardedFieldCount: Int,
+private val intermediateType: TypeInformation[Row],
+private val inputType: TypeInformation[Row])
+  extends ProcessFunction[Row, Row]{
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+
+  private var output: Row = _
+  private var accumulatorState: ValueState[Row] = _
+  private var rowState: ListState[Tuple2[Long, Row]] = _
+
+
+  override def open(config: Configuration) {
+output = new Row(forwardedFieldCount + aggregates.length)
+val valueSerializer: TypeSerializer[Row] =
+  
intermediateType.createSerializer(getRuntimeContext.getExecutionConfig)
+val stateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer)
+accumulatorState = getRuntimeContext.getState[Row](stateDescriptor)
+
+val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] =
+  (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, 
inputType)).createSerializer(
+
getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, 
Row]]]
+val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] =
+  new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", 
tupleSerializer)
+rowState = getRuntimeContext.getListState[Tuple2[Long, 
Row]](tupleStateDescriptor)
+
+  }
+
+  /**
+* Process one element from the input stream, not emit the output
+*
+* @param value The input value.
+* @param ctx   The ctx to register timer or get current time
+* @param out   The collector for returning result values.
+*
+*/
+  override def processElement(
+ input: Row,
+ ctx:  ProcessFunction[Row, Row]#Context,
+ out: Collector[Row]): Unit = {
+
+// discard later record
+if (ctx.timestamp() >= ctx.timerService().currentWatermark()) {
+  // ensure every key just register on timer
+  

[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929745#comment-15929745
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r106623012
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
 ---
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.types.Row
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.tuple.Tuple2
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+
+/**
+  * A ProcessFunction to support unbounded event-time over-window
+  *
+  * @param aggregates the aggregate functions
+  * @param aggFields  the filed index which the aggregate functions use
+  * @param forwardedFieldCount the input fields count
+  * @param intermediateType the intermediate row tye which the state saved
+  * @param inputType the input row tye which the state saved
+  *
+  */
+class UnboundedEventTimeOverProcessFunction(
+private val aggregates: Array[AggregateFunction[_]],
+private val aggFields: Array[Int],
+private val forwardedFieldCount: Int,
+private val intermediateType: TypeInformation[Row],
+private val inputType: TypeInformation[Row])
+  extends ProcessFunction[Row, Row]{
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+
+  private var output: Row = _
+  private var accumulatorState: ValueState[Row] = _
+  private var rowState: ListState[Tuple2[Long, Row]] = _
+
+
+  override def open(config: Configuration) {
+output = new Row(forwardedFieldCount + aggregates.length)
+val valueSerializer: TypeSerializer[Row] =
+  
intermediateType.createSerializer(getRuntimeContext.getExecutionConfig)
+val stateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer)
+accumulatorState = getRuntimeContext.getState[Row](stateDescriptor)
+
+val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] =
+  (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, 
inputType)).createSerializer(
+
getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, 
Row]]]
+val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] =
+  new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", 
tupleSerializer)
+rowState = getRuntimeContext.getListState[Tuple2[Long, 
Row]](tupleStateDescriptor)
+
+  }
+
+  /**
+* Process one element from the input stream, not emit the output
+*
+* @param value The input value.
+* @param ctx   The ctx to register timer or get current time
+* @param out   The collector for returning result values.
+*
+*/
+  override def processElement(
+ input: Row,
+ ctx:  ProcessFunction[Row, Row]#Context,
+ out: Collector[Row]): Unit = {
+
+// discard later record
+if (ctx.timestamp() >= ctx.timerService().currentWatermark()) {
+  // ensure every key just register on timer
+  

[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929746#comment-15929746
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r106622821
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
 ---
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.types.Row
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.tuple.Tuple2
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+
+/**
+  * A ProcessFunction to support unbounded event-time over-window
+  *
+  * @param aggregates the aggregate functions
+  * @param aggFields  the filed index which the aggregate functions use
+  * @param forwardedFieldCount the input fields count
+  * @param intermediateType the intermediate row tye which the state saved
+  * @param inputType the input row tye which the state saved
+  *
+  */
+class UnboundedEventTimeOverProcessFunction(
+private val aggregates: Array[AggregateFunction[_]],
+private val aggFields: Array[Int],
+private val forwardedFieldCount: Int,
+private val intermediateType: TypeInformation[Row],
+private val inputType: TypeInformation[Row])
+  extends ProcessFunction[Row, Row]{
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+
+  private var output: Row = _
+  private var accumulatorState: ValueState[Row] = _
+  private var rowState: ListState[Tuple2[Long, Row]] = _
+
+
+  override def open(config: Configuration) {
+output = new Row(forwardedFieldCount + aggregates.length)
+val valueSerializer: TypeSerializer[Row] =
+  
intermediateType.createSerializer(getRuntimeContext.getExecutionConfig)
+val stateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer)
+accumulatorState = getRuntimeContext.getState[Row](stateDescriptor)
+
+val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] =
+  (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, 
inputType)).createSerializer(
+
getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, 
Row]]]
+val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] =
+  new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", 
tupleSerializer)
+rowState = getRuntimeContext.getListState[Tuple2[Long, 
Row]](tupleStateDescriptor)
+
+  }
+
+  /**
+* Process one element from the input stream, not emit the output
+*
+* @param value The input value.
+* @param ctx   The ctx to register timer or get current time
+* @param out   The collector for returning result values.
+*
+*/
+  override def processElement(
+ input: Row,
+ ctx:  ProcessFunction[Row, Row]#Context,
+ out: Collector[Row]): Unit = {
+
+// discard later record
+if (ctx.timestamp() >= ctx.timerService().currentWatermark()) {
+  // ensure every key just register on timer
+  

[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929733#comment-15929733
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r106625858
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
 ---
@@ -317,4 +320,119 @@ class SqlITCase extends StreamingWithStateTestBase {
 result.addSink(new StreamITCase.StringSink)
 env.execute()
   }
+
+  /** test sliding event-time unbounded window with partition by **/
+  @Test
+  def testUnboundedEventTimeRowWindowWithPartition(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+env.setStateBackend(getStateBackend)
+StreamITCase.testResults = mutable.MutableList()
+env.setParallelism(1)
+
+val sqlQuery = "SELECT a, b, c, " +
+  "SUM(a) over (" +
+  "partition by a order by rowtime() range between unbounded preceding 
and current row), " +
+  "count(a) over (" +
+  "partition by a order by rowtime() range between unbounded preceding 
and current row), " +
+  "avg(a) over (" +
--- End diff --

Also, most groups have just a single record. The max is two records. With 
that we cannot really check if the sorting works correctly. Can you make less 
groups (less distinct `a` values) and add more rows for some groups with 
out-of-order timestamps?


> Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5658
> URL: https://issues.apache.org/jira/browse/FLINK-5658
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Yuhong Hong
>
> The goal of this issue is to add support for OVER RANGE aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - bounded PRECEDING is not supported (see FLINK-5655)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929743#comment-15929743
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r106623827
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
 ---
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.types.Row
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.tuple.Tuple2
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+
+/**
+  * A ProcessFunction to support unbounded event-time over-window
+  *
+  * @param aggregates the aggregate functions
+  * @param aggFields  the filed index which the aggregate functions use
+  * @param forwardedFieldCount the input fields count
+  * @param intermediateType the intermediate row tye which the state saved
+  * @param inputType the input row tye which the state saved
+  *
+  */
+class UnboundedEventTimeOverProcessFunction(
+private val aggregates: Array[AggregateFunction[_]],
+private val aggFields: Array[Int],
+private val forwardedFieldCount: Int,
+private val intermediateType: TypeInformation[Row],
+private val inputType: TypeInformation[Row])
+  extends ProcessFunction[Row, Row]{
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+
+  private var output: Row = _
+  private var accumulatorState: ValueState[Row] = _
+  private var rowState: ListState[Tuple2[Long, Row]] = _
+
+
+  override def open(config: Configuration) {
+output = new Row(forwardedFieldCount + aggregates.length)
+val valueSerializer: TypeSerializer[Row] =
+  
intermediateType.createSerializer(getRuntimeContext.getExecutionConfig)
+val stateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer)
+accumulatorState = getRuntimeContext.getState[Row](stateDescriptor)
+
+val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] =
+  (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, 
inputType)).createSerializer(
+
getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, 
Row]]]
+val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] =
+  new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", 
tupleSerializer)
+rowState = getRuntimeContext.getListState[Tuple2[Long, 
Row]](tupleStateDescriptor)
+
+  }
+
+  /**
+* Process one element from the input stream, not emit the output
+*
+* @param value The input value.
+* @param ctx   The ctx to register timer or get current time
+* @param out   The collector for returning result values.
+*
+*/
+  override def processElement(
+ input: Row,
+ ctx:  ProcessFunction[Row, Row]#Context,
+ out: Collector[Row]): Unit = {
+
+// discard later record
+if (ctx.timestamp() >= ctx.timerService().currentWatermark()) {
+  // ensure every key just register on timer
+  

[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929739#comment-15929739
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r106624514
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
 ---
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.types.Row
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.tuple.Tuple2
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+
+/**
+  * A ProcessFunction to support unbounded event-time over-window
+  *
+  * @param aggregates the aggregate functions
+  * @param aggFields  the filed index which the aggregate functions use
+  * @param forwardedFieldCount the input fields count
+  * @param intermediateType the intermediate row tye which the state saved
+  * @param inputType the input row tye which the state saved
+  *
+  */
+class UnboundedEventTimeOverProcessFunction(
+private val aggregates: Array[AggregateFunction[_]],
+private val aggFields: Array[Int],
+private val forwardedFieldCount: Int,
+private val intermediateType: TypeInformation[Row],
+private val inputType: TypeInformation[Row])
+  extends ProcessFunction[Row, Row]{
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+
+  private var output: Row = _
+  private var accumulatorState: ValueState[Row] = _
+  private var rowState: ListState[Tuple2[Long, Row]] = _
+
+
+  override def open(config: Configuration) {
+output = new Row(forwardedFieldCount + aggregates.length)
+val valueSerializer: TypeSerializer[Row] =
+  
intermediateType.createSerializer(getRuntimeContext.getExecutionConfig)
+val stateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer)
+accumulatorState = getRuntimeContext.getState[Row](stateDescriptor)
+
+val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] =
+  (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, 
inputType)).createSerializer(
+
getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, 
Row]]]
+val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] =
+  new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", 
tupleSerializer)
+rowState = getRuntimeContext.getListState[Tuple2[Long, 
Row]](tupleStateDescriptor)
+
+  }
+
+  /**
+* Process one element from the input stream, not emit the output
+*
+* @param value The input value.
+* @param ctx   The ctx to register timer or get current time
+* @param out   The collector for returning result values.
+*
+*/
+  override def processElement(
+ input: Row,
+ ctx:  ProcessFunction[Row, Row]#Context,
+ out: Collector[Row]): Unit = {
+
+// discard later record
+if (ctx.timestamp() >= ctx.timerService().currentWatermark()) {
+  // ensure every key just register on timer
+  

[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929736#comment-15929736
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r106626434
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
 ---
@@ -317,4 +320,119 @@ class SqlITCase extends StreamingWithStateTestBase {
 result.addSink(new StreamITCase.StringSink)
 env.execute()
   }
+
+  /** test sliding event-time unbounded window with partition by **/
+  @Test
+  def testUnboundedEventTimeRowWindowWithPartition(): Unit = {
--- End diff --

Can you also add a few unit tests to 
`org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala` to 
verify that the query is correctly translated? Thanks!


> Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5658
> URL: https://issues.apache.org/jira/browse/FLINK-5658
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Yuhong Hong
>
> The goal of this issue is to add support for OVER RANGE aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - bounded PRECEDING is not supported (see FLINK-5655)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929744#comment-15929744
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r106617971
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -112,7 +113,14 @@ class DataStreamOverAggregate(
   "condition.")
 }
   case _: RowTimeType =>
-throw new TableException("OVER Window of the EventTime type is not 
currently supported.")
+if (overWindow.lowerBound.isUnbounded &&
+  overWindow.upperBound.isCurrentRow) {
--- End diff --

move this condition into the line above?


> Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5658
> URL: https://issues.apache.org/jira/browse/FLINK-5658
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Yuhong Hong
>
> The goal of this issue is to add support for OVER RANGE aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - bounded PRECEDING is not supported (see FLINK-5655)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929734#comment-15929734
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r106624582
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
 ---
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.types.Row
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.tuple.Tuple2
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+
+/**
+  * A ProcessFunction to support unbounded event-time over-window
+  *
+  * @param aggregates the aggregate functions
+  * @param aggFields  the filed index which the aggregate functions use
+  * @param forwardedFieldCount the input fields count
+  * @param intermediateType the intermediate row tye which the state saved
+  * @param inputType the input row tye which the state saved
+  *
+  */
+class UnboundedEventTimeOverProcessFunction(
+private val aggregates: Array[AggregateFunction[_]],
+private val aggFields: Array[Int],
+private val forwardedFieldCount: Int,
+private val intermediateType: TypeInformation[Row],
+private val inputType: TypeInformation[Row])
+  extends ProcessFunction[Row, Row]{
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+
+  private var output: Row = _
+  private var accumulatorState: ValueState[Row] = _
+  private var rowState: ListState[Tuple2[Long, Row]] = _
+
+
+  override def open(config: Configuration) {
+output = new Row(forwardedFieldCount + aggregates.length)
+val valueSerializer: TypeSerializer[Row] =
+  
intermediateType.createSerializer(getRuntimeContext.getExecutionConfig)
+val stateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer)
+accumulatorState = getRuntimeContext.getState[Row](stateDescriptor)
+
+val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] =
+  (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, 
inputType)).createSerializer(
+
getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, 
Row]]]
+val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] =
+  new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", 
tupleSerializer)
+rowState = getRuntimeContext.getListState[Tuple2[Long, 
Row]](tupleStateDescriptor)
+
+  }
+
+  /**
+* Process one element from the input stream, not emit the output
+*
+* @param value The input value.
+* @param ctx   The ctx to register timer or get current time
+* @param out   The collector for returning result values.
+*
+*/
+  override def processElement(
+ input: Row,
+ ctx:  ProcessFunction[Row, Row]#Context,
+ out: Collector[Row]): Unit = {
+
+// discard later record
+if (ctx.timestamp() >= ctx.timerService().currentWatermark()) {
+  // ensure every key just register on timer
+  

[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929735#comment-15929735
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r106623601
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
 ---
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.types.Row
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.tuple.Tuple2
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+
+/**
+  * A ProcessFunction to support unbounded event-time over-window
+  *
+  * @param aggregates the aggregate functions
+  * @param aggFields  the filed index which the aggregate functions use
+  * @param forwardedFieldCount the input fields count
+  * @param intermediateType the intermediate row tye which the state saved
+  * @param inputType the input row tye which the state saved
+  *
+  */
+class UnboundedEventTimeOverProcessFunction(
+private val aggregates: Array[AggregateFunction[_]],
+private val aggFields: Array[Int],
+private val forwardedFieldCount: Int,
+private val intermediateType: TypeInformation[Row],
+private val inputType: TypeInformation[Row])
+  extends ProcessFunction[Row, Row]{
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+
+  private var output: Row = _
+  private var accumulatorState: ValueState[Row] = _
+  private var rowState: ListState[Tuple2[Long, Row]] = _
+
+
+  override def open(config: Configuration) {
+output = new Row(forwardedFieldCount + aggregates.length)
+val valueSerializer: TypeSerializer[Row] =
+  
intermediateType.createSerializer(getRuntimeContext.getExecutionConfig)
+val stateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer)
+accumulatorState = getRuntimeContext.getState[Row](stateDescriptor)
+
+val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] =
+  (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, 
inputType)).createSerializer(
+
getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, 
Row]]]
+val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] =
+  new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", 
tupleSerializer)
+rowState = getRuntimeContext.getListState[Tuple2[Long, 
Row]](tupleStateDescriptor)
+
+  }
+
+  /**
+* Process one element from the input stream, not emit the output
+*
+* @param value The input value.
+* @param ctx   The ctx to register timer or get current time
+* @param out   The collector for returning result values.
+*
+*/
+  override def processElement(
+ input: Row,
+ ctx:  ProcessFunction[Row, Row]#Context,
+ out: Collector[Row]): Unit = {
+
+// discard later record
+if (ctx.timestamp() >= ctx.timerService().currentWatermark()) {
+  // ensure every key just register on timer
+  

[jira] [Commented] (FLINK-5658) Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929737#comment-15929737
 ] 

ASF GitHub Bot commented on FLINK-5658:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r106625475
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
 ---
@@ -317,4 +320,119 @@ class SqlITCase extends StreamingWithStateTestBase {
 result.addSink(new StreamITCase.StringSink)
 env.execute()
   }
+
+  /** test sliding event-time unbounded window with partition by **/
+  @Test
+  def testUnboundedEventTimeRowWindowWithPartition(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+env.setStateBackend(getStateBackend)
+StreamITCase.testResults = mutable.MutableList()
+env.setParallelism(1)
+
+val sqlQuery = "SELECT a, b, c, " +
+  "SUM(a) over (" +
+  "partition by a order by rowtime() range between unbounded preceding 
and current row), " +
+  "count(a) over (" +
+  "partition by a order by rowtime() range between unbounded preceding 
and current row), " +
+  "avg(a) over (" +
--- End diff --

Computing `avg`, `max`, `min` on the partition key is not very meaningful. 
Can you compute those on `b`?


> Add event time OVER RANGE BETWEEN UNBOUNDED PRECEDING aggregation to SQL
> 
>
> Key: FLINK-5658
> URL: https://issues.apache.org/jira/browse/FLINK-5658
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Yuhong Hong
>
> The goal of this issue is to add support for OVER RANGE aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY rowTime() RANGE BETWEEN UNBOUNDED 
> PRECEDING AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a 
> parameterless scalar function that just indicates processing time mode.
> - bounded PRECEDING is not supported (see FLINK-5655)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-03-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r106620612
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -159,6 +167,46 @@ class DataStreamOverAggregate(
 result
   }
 
+  def createUnboundedAndCurrentRowEventTimeOverWindow(
+inputDS: DataStream[Row]): DataStream[Row]  = {
+
+val overWindow: Group = logicWindow.groups.get(0)
+val partitionKeys: Array[Int] = overWindow.keys.toArray
+val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = 
generateNamedAggregates
+
+// get the output types
+val rowTypeInfo = 
FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
+
+val result: DataStream[Row] =
+  // partitioned aggregation
+  if (partitionKeys.nonEmpty) {
+val keyedStream = inputDS.keyBy(partitionKeys: _*)
+val processFunction = 
AggregateUtil.CreateUnboundedEventTimeOverProcessFunction(
+  namedAggregates,
+  inputType)
+
+keyedStream
+  .process(processFunction)
+  .returns(rowTypeInfo)
+  .name(aggOpName)
+  .asInstanceOf[DataStream[Row]]
+  }
+  // global non-partitioned aggregation
+  else {
+val processFunction = 
AggregateUtil.CreateUnboundedEventTimeOverProcessFunction(
+  namedAggregates,
+  inputType)
+
+inputDS.keyBy(new NullByteKeySelector[Row])
+  .process(processFunction)
+  .setParallelism(1)
--- End diff --

also `setMaxParallelism(1)` to prevent that this operator can be scaled out.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-03-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r106625858
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
 ---
@@ -317,4 +320,119 @@ class SqlITCase extends StreamingWithStateTestBase {
 result.addSink(new StreamITCase.StringSink)
 env.execute()
   }
+
+  /** test sliding event-time unbounded window with partition by **/
+  @Test
+  def testUnboundedEventTimeRowWindowWithPartition(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+env.setStateBackend(getStateBackend)
+StreamITCase.testResults = mutable.MutableList()
+env.setParallelism(1)
+
+val sqlQuery = "SELECT a, b, c, " +
+  "SUM(a) over (" +
+  "partition by a order by rowtime() range between unbounded preceding 
and current row), " +
+  "count(a) over (" +
+  "partition by a order by rowtime() range between unbounded preceding 
and current row), " +
+  "avg(a) over (" +
--- End diff --

Also, most groups have just a single record. The max is two records. With 
that we cannot really check if the sorting works correctly. Can you make less 
groups (less distinct `a` values) and add more rows for some groups with 
out-of-order timestamps?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-03-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r106623827
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
 ---
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.types.Row
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.tuple.Tuple2
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+
+/**
+  * A ProcessFunction to support unbounded event-time over-window
+  *
+  * @param aggregates the aggregate functions
+  * @param aggFields  the filed index which the aggregate functions use
+  * @param forwardedFieldCount the input fields count
+  * @param intermediateType the intermediate row tye which the state saved
+  * @param inputType the input row tye which the state saved
+  *
+  */
+class UnboundedEventTimeOverProcessFunction(
+private val aggregates: Array[AggregateFunction[_]],
+private val aggFields: Array[Int],
+private val forwardedFieldCount: Int,
+private val intermediateType: TypeInformation[Row],
+private val inputType: TypeInformation[Row])
+  extends ProcessFunction[Row, Row]{
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+
+  private var output: Row = _
+  private var accumulatorState: ValueState[Row] = _
+  private var rowState: ListState[Tuple2[Long, Row]] = _
+
+
+  override def open(config: Configuration) {
+output = new Row(forwardedFieldCount + aggregates.length)
+val valueSerializer: TypeSerializer[Row] =
+  
intermediateType.createSerializer(getRuntimeContext.getExecutionConfig)
+val stateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer)
+accumulatorState = getRuntimeContext.getState[Row](stateDescriptor)
+
+val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] =
+  (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, 
inputType)).createSerializer(
+
getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, 
Row]]]
+val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] =
+  new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", 
tupleSerializer)
+rowState = getRuntimeContext.getListState[Tuple2[Long, 
Row]](tupleStateDescriptor)
+
+  }
+
+  /**
+* Process one element from the input stream, not emit the output
+*
+* @param value The input value.
+* @param ctx   The ctx to register timer or get current time
+* @param out   The collector for returning result values.
+*
+*/
+  override def processElement(
+ input: Row,
+ ctx:  ProcessFunction[Row, Row]#Context,
+ out: Collector[Row]): Unit = {
+
+// discard later record
+if (ctx.timestamp() >= ctx.timerService().currentWatermark()) {
+  // ensure every key just register on timer
+  
ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1)
+
+  rowState.add(new Tuple2(ctx.timestamp, input))
+}
+  }
+
+  /**
+* Called when a timer set fires, sort current 

[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-03-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r106622821
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
 ---
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.types.Row
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.tuple.Tuple2
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+
+/**
+  * A ProcessFunction to support unbounded event-time over-window
+  *
+  * @param aggregates the aggregate functions
+  * @param aggFields  the filed index which the aggregate functions use
+  * @param forwardedFieldCount the input fields count
+  * @param intermediateType the intermediate row tye which the state saved
+  * @param inputType the input row tye which the state saved
+  *
+  */
+class UnboundedEventTimeOverProcessFunction(
+private val aggregates: Array[AggregateFunction[_]],
+private val aggFields: Array[Int],
+private val forwardedFieldCount: Int,
+private val intermediateType: TypeInformation[Row],
+private val inputType: TypeInformation[Row])
+  extends ProcessFunction[Row, Row]{
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+
+  private var output: Row = _
+  private var accumulatorState: ValueState[Row] = _
+  private var rowState: ListState[Tuple2[Long, Row]] = _
+
+
+  override def open(config: Configuration) {
+output = new Row(forwardedFieldCount + aggregates.length)
+val valueSerializer: TypeSerializer[Row] =
+  
intermediateType.createSerializer(getRuntimeContext.getExecutionConfig)
+val stateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer)
+accumulatorState = getRuntimeContext.getState[Row](stateDescriptor)
+
+val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] =
+  (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, 
inputType)).createSerializer(
+
getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, 
Row]]]
+val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] =
+  new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", 
tupleSerializer)
+rowState = getRuntimeContext.getListState[Tuple2[Long, 
Row]](tupleStateDescriptor)
+
+  }
+
+  /**
+* Process one element from the input stream, not emit the output
+*
+* @param value The input value.
+* @param ctx   The ctx to register timer or get current time
+* @param out   The collector for returning result values.
+*
+*/
+  override def processElement(
+ input: Row,
+ ctx:  ProcessFunction[Row, Row]#Context,
+ out: Collector[Row]): Unit = {
+
+// discard later record
+if (ctx.timestamp() >= ctx.timerService().currentWatermark()) {
+  // ensure every key just register on timer
+  
ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1)
+
+  rowState.add(new Tuple2(ctx.timestamp, input))
+}
+  }
+
+  /**
+* Called when a timer set fires, sort current 

[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-03-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r106621564
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -91,6 +91,35 @@ object AggregateUtil {
   }
 
   /**
+* Create an [[ProcessFunction]] to evaluate final aggregate value.
+*
+* @param namedAggregates List of calls to aggregate functions and 
their output field names
+* @param inputType Input row type
+* @return [[UnboundedProcessingOverProcessFunction]]
+*/
+  private[flink] def CreateUnboundedEventTimeOverProcessFunction(
+   namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+   inputType: RelDataType): UnboundedEventTimeOverProcessFunction = {
+
+val (aggFields, aggregates) =
+  transformToAggregateFunctions(
+namedAggregates.map(_.getKey),
+inputType,
+needRetraction = false)
+
+val aggregationStateType: RowTypeInfo =
--- End diff --

Also you can use `createAccumulatorRowType(inputType, aggregates)`.

Btw. could you refactor the `createAccumulatorRowType(inputType, 
aggregates)` method and remove the `inputType` parameter? It is not used. 
Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-03-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r106623601
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
 ---
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.types.Row
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.tuple.Tuple2
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+
+/**
+  * A ProcessFunction to support unbounded event-time over-window
+  *
+  * @param aggregates the aggregate functions
+  * @param aggFields  the filed index which the aggregate functions use
+  * @param forwardedFieldCount the input fields count
+  * @param intermediateType the intermediate row tye which the state saved
+  * @param inputType the input row tye which the state saved
+  *
+  */
+class UnboundedEventTimeOverProcessFunction(
+private val aggregates: Array[AggregateFunction[_]],
+private val aggFields: Array[Int],
+private val forwardedFieldCount: Int,
+private val intermediateType: TypeInformation[Row],
+private val inputType: TypeInformation[Row])
+  extends ProcessFunction[Row, Row]{
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+
+  private var output: Row = _
+  private var accumulatorState: ValueState[Row] = _
+  private var rowState: ListState[Tuple2[Long, Row]] = _
+
+
+  override def open(config: Configuration) {
+output = new Row(forwardedFieldCount + aggregates.length)
+val valueSerializer: TypeSerializer[Row] =
+  
intermediateType.createSerializer(getRuntimeContext.getExecutionConfig)
+val stateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer)
+accumulatorState = getRuntimeContext.getState[Row](stateDescriptor)
+
+val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] =
+  (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, 
inputType)).createSerializer(
+
getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, 
Row]]]
+val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] =
+  new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", 
tupleSerializer)
+rowState = getRuntimeContext.getListState[Tuple2[Long, 
Row]](tupleStateDescriptor)
+
+  }
+
+  /**
+* Process one element from the input stream, not emit the output
+*
+* @param value The input value.
+* @param ctx   The ctx to register timer or get current time
+* @param out   The collector for returning result values.
+*
+*/
+  override def processElement(
+ input: Row,
+ ctx:  ProcessFunction[Row, Row]#Context,
+ out: Collector[Row]): Unit = {
+
+// discard later record
+if (ctx.timestamp() >= ctx.timerService().currentWatermark()) {
+  // ensure every key just register on timer
+  
ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1)
+
+  rowState.add(new Tuple2(ctx.timestamp, input))
+}
+  }
+
+  /**
+* Called when a timer set fires, sort current 

[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-03-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r106623012
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
 ---
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.types.Row
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.tuple.Tuple2
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+
+/**
+  * A ProcessFunction to support unbounded event-time over-window
+  *
+  * @param aggregates the aggregate functions
+  * @param aggFields  the filed index which the aggregate functions use
+  * @param forwardedFieldCount the input fields count
+  * @param intermediateType the intermediate row tye which the state saved
+  * @param inputType the input row tye which the state saved
+  *
+  */
+class UnboundedEventTimeOverProcessFunction(
+private val aggregates: Array[AggregateFunction[_]],
+private val aggFields: Array[Int],
+private val forwardedFieldCount: Int,
+private val intermediateType: TypeInformation[Row],
+private val inputType: TypeInformation[Row])
+  extends ProcessFunction[Row, Row]{
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+
+  private var output: Row = _
+  private var accumulatorState: ValueState[Row] = _
+  private var rowState: ListState[Tuple2[Long, Row]] = _
+
+
+  override def open(config: Configuration) {
+output = new Row(forwardedFieldCount + aggregates.length)
+val valueSerializer: TypeSerializer[Row] =
+  
intermediateType.createSerializer(getRuntimeContext.getExecutionConfig)
+val stateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer)
+accumulatorState = getRuntimeContext.getState[Row](stateDescriptor)
+
+val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] =
+  (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, 
inputType)).createSerializer(
+
getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, 
Row]]]
+val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] =
+  new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", 
tupleSerializer)
+rowState = getRuntimeContext.getListState[Tuple2[Long, 
Row]](tupleStateDescriptor)
+
+  }
+
+  /**
+* Process one element from the input stream, not emit the output
+*
+* @param value The input value.
+* @param ctx   The ctx to register timer or get current time
+* @param out   The collector for returning result values.
+*
+*/
+  override def processElement(
+ input: Row,
+ ctx:  ProcessFunction[Row, Row]#Context,
+ out: Collector[Row]): Unit = {
+
+// discard later record
+if (ctx.timestamp() >= ctx.timerService().currentWatermark()) {
+  // ensure every key just register on timer
+  
ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1)
+
+  rowState.add(new Tuple2(ctx.timestamp, input))
+}
+  }
+
+  /**
+* Called when a timer set fires, sort current 

[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-03-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r106617971
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -112,7 +113,14 @@ class DataStreamOverAggregate(
   "condition.")
 }
   case _: RowTimeType =>
-throw new TableException("OVER Window of the EventTime type is not 
currently supported.")
+if (overWindow.lowerBound.isUnbounded &&
+  overWindow.upperBound.isCurrentRow) {
--- End diff --

move this condition into the line above?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-03-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r106624514
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
 ---
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.types.Row
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.tuple.Tuple2
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+
+/**
+  * A ProcessFunction to support unbounded event-time over-window
+  *
+  * @param aggregates the aggregate functions
+  * @param aggFields  the filed index which the aggregate functions use
+  * @param forwardedFieldCount the input fields count
+  * @param intermediateType the intermediate row tye which the state saved
+  * @param inputType the input row tye which the state saved
+  *
+  */
+class UnboundedEventTimeOverProcessFunction(
+private val aggregates: Array[AggregateFunction[_]],
+private val aggFields: Array[Int],
+private val forwardedFieldCount: Int,
+private val intermediateType: TypeInformation[Row],
+private val inputType: TypeInformation[Row])
+  extends ProcessFunction[Row, Row]{
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+
+  private var output: Row = _
+  private var accumulatorState: ValueState[Row] = _
+  private var rowState: ListState[Tuple2[Long, Row]] = _
+
+
+  override def open(config: Configuration) {
+output = new Row(forwardedFieldCount + aggregates.length)
+val valueSerializer: TypeSerializer[Row] =
+  
intermediateType.createSerializer(getRuntimeContext.getExecutionConfig)
+val stateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer)
+accumulatorState = getRuntimeContext.getState[Row](stateDescriptor)
+
+val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] =
+  (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, 
inputType)).createSerializer(
+
getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, 
Row]]]
+val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] =
+  new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", 
tupleSerializer)
+rowState = getRuntimeContext.getListState[Tuple2[Long, 
Row]](tupleStateDescriptor)
+
+  }
+
+  /**
+* Process one element from the input stream, not emit the output
+*
+* @param value The input value.
+* @param ctx   The ctx to register timer or get current time
+* @param out   The collector for returning result values.
+*
+*/
+  override def processElement(
+ input: Row,
+ ctx:  ProcessFunction[Row, Row]#Context,
+ out: Collector[Row]): Unit = {
+
+// discard later record
+if (ctx.timestamp() >= ctx.timerService().currentWatermark()) {
+  // ensure every key just register on timer
+  
ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1)
+
+  rowState.add(new Tuple2(ctx.timestamp, input))
+}
+  }
+
+  /**
+* Called when a timer set fires, sort current 

[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-03-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r106626434
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
 ---
@@ -317,4 +320,119 @@ class SqlITCase extends StreamingWithStateTestBase {
 result.addSink(new StreamITCase.StringSink)
 env.execute()
   }
+
+  /** test sliding event-time unbounded window with partition by **/
+  @Test
+  def testUnboundedEventTimeRowWindowWithPartition(): Unit = {
--- End diff --

Can you also add a few unit tests to 
`org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala` to 
verify that the query is correctly translated? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-03-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r106624582
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
 ---
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.types.Row
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.tuple.Tuple2
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+
+/**
+  * A ProcessFunction to support unbounded event-time over-window
+  *
+  * @param aggregates the aggregate functions
+  * @param aggFields  the filed index which the aggregate functions use
+  * @param forwardedFieldCount the input fields count
+  * @param intermediateType the intermediate row tye which the state saved
+  * @param inputType the input row tye which the state saved
+  *
+  */
+class UnboundedEventTimeOverProcessFunction(
+private val aggregates: Array[AggregateFunction[_]],
+private val aggFields: Array[Int],
+private val forwardedFieldCount: Int,
+private val intermediateType: TypeInformation[Row],
+private val inputType: TypeInformation[Row])
+  extends ProcessFunction[Row, Row]{
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+
+  private var output: Row = _
+  private var accumulatorState: ValueState[Row] = _
+  private var rowState: ListState[Tuple2[Long, Row]] = _
+
+
+  override def open(config: Configuration) {
+output = new Row(forwardedFieldCount + aggregates.length)
+val valueSerializer: TypeSerializer[Row] =
+  
intermediateType.createSerializer(getRuntimeContext.getExecutionConfig)
+val stateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer)
+accumulatorState = getRuntimeContext.getState[Row](stateDescriptor)
+
+val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] =
+  (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, 
inputType)).createSerializer(
+
getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, 
Row]]]
+val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] =
+  new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", 
tupleSerializer)
+rowState = getRuntimeContext.getListState[Tuple2[Long, 
Row]](tupleStateDescriptor)
+
+  }
+
+  /**
+* Process one element from the input stream, not emit the output
+*
+* @param value The input value.
+* @param ctx   The ctx to register timer or get current time
+* @param out   The collector for returning result values.
+*
+*/
+  override def processElement(
+ input: Row,
+ ctx:  ProcessFunction[Row, Row]#Context,
+ out: Collector[Row]): Unit = {
+
+// discard later record
+if (ctx.timestamp() >= ctx.timerService().currentWatermark()) {
+  // ensure every key just register on timer
+  
ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1)
+
+  rowState.add(new Tuple2(ctx.timestamp, input))
+}
+  }
+
+  /**
+* Called when a timer set fires, sort current 

[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-03-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r106625475
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
 ---
@@ -317,4 +320,119 @@ class SqlITCase extends StreamingWithStateTestBase {
 result.addSink(new StreamITCase.StringSink)
 env.execute()
   }
+
+  /** test sliding event-time unbounded window with partition by **/
+  @Test
+  def testUnboundedEventTimeRowWindowWithPartition(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+env.setStateBackend(getStateBackend)
+StreamITCase.testResults = mutable.MutableList()
+env.setParallelism(1)
+
+val sqlQuery = "SELECT a, b, c, " +
+  "SUM(a) over (" +
+  "partition by a order by rowtime() range between unbounded preceding 
and current row), " +
+  "count(a) over (" +
+  "partition by a order by rowtime() range between unbounded preceding 
and current row), " +
+  "avg(a) over (" +
--- End diff --

Computing `avg`, `max`, `min` on the partition key is not very meaningful. 
Can you compute those on `b`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-03-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r106622479
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
 ---
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.types.Row
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.tuple.Tuple2
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+
+/**
+  * A ProcessFunction to support unbounded event-time over-window
+  *
+  * @param aggregates the aggregate functions
+  * @param aggFields  the filed index which the aggregate functions use
+  * @param forwardedFieldCount the input fields count
+  * @param intermediateType the intermediate row tye which the state saved
+  * @param inputType the input row tye which the state saved
+  *
+  */
+class UnboundedEventTimeOverProcessFunction(
+private val aggregates: Array[AggregateFunction[_]],
+private val aggFields: Array[Int],
+private val forwardedFieldCount: Int,
+private val intermediateType: TypeInformation[Row],
+private val inputType: TypeInformation[Row])
+  extends ProcessFunction[Row, Row]{
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+
+  private var output: Row = _
+  private var accumulatorState: ValueState[Row] = _
+  private var rowState: ListState[Tuple2[Long, Row]] = _
+
+
+  override def open(config: Configuration) {
+output = new Row(forwardedFieldCount + aggregates.length)
+val valueSerializer: TypeSerializer[Row] =
+  
intermediateType.createSerializer(getRuntimeContext.getExecutionConfig)
+val stateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer)
+accumulatorState = getRuntimeContext.getState[Row](stateDescriptor)
+
+val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] =
+  (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, 
inputType)).createSerializer(
+
getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, 
Row]]]
+val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] =
+  new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", 
tupleSerializer)
+rowState = getRuntimeContext.getListState[Tuple2[Long, 
Row]](tupleStateDescriptor)
+
+  }
+
+  /**
+* Process one element from the input stream, not emit the output
+*
+* @param value The input value.
+* @param ctx   The ctx to register timer or get current time
+* @param out   The collector for returning result values.
+*
+*/
+  override def processElement(
+ input: Row,
+ ctx:  ProcessFunction[Row, Row]#Context,
+ out: Collector[Row]): Unit = {
+
+// discard later record
+if (ctx.timestamp() >= ctx.timerService().currentWatermark()) {
+  // ensure every key just register on timer
+  
ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1)
--- End diff --

we should register the timer based on the record timestamp: 
`ctx.timerService.registerEventTimeTimer(ctx.timestamp + 1)`


---

[GitHub] flink pull request #3386: [FLINK-5658][table] support unbounded eventtime ov...

2017-03-17 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3386#discussion_r106623704
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala
 ---
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.aggregate
+
+import java.util
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.types.Row
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.util.{Collector, Preconditions}
+import org.apache.flink.api.common.state._
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.tuple.Tuple2
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.streaming.api.operators.TimestampedCollector
+import org.apache.flink.table.functions.{Accumulator, AggregateFunction}
+
+
+/**
+  * A ProcessFunction to support unbounded event-time over-window
+  *
+  * @param aggregates the aggregate functions
+  * @param aggFields  the filed index which the aggregate functions use
+  * @param forwardedFieldCount the input fields count
+  * @param intermediateType the intermediate row tye which the state saved
+  * @param inputType the input row tye which the state saved
+  *
+  */
+class UnboundedEventTimeOverProcessFunction(
+private val aggregates: Array[AggregateFunction[_]],
+private val aggFields: Array[Int],
+private val forwardedFieldCount: Int,
+private val intermediateType: TypeInformation[Row],
+private val inputType: TypeInformation[Row])
+  extends ProcessFunction[Row, Row]{
+
+  Preconditions.checkNotNull(aggregates)
+  Preconditions.checkNotNull(aggFields)
+  Preconditions.checkArgument(aggregates.length == aggFields.length)
+
+  private var output: Row = _
+  private var accumulatorState: ValueState[Row] = _
+  private var rowState: ListState[Tuple2[Long, Row]] = _
+
+
+  override def open(config: Configuration) {
+output = new Row(forwardedFieldCount + aggregates.length)
+val valueSerializer: TypeSerializer[Row] =
+  
intermediateType.createSerializer(getRuntimeContext.getExecutionConfig)
+val stateDescriptor: ValueStateDescriptor[Row] =
+  new ValueStateDescriptor[Row]("accumulatorstate", valueSerializer)
+accumulatorState = getRuntimeContext.getState[Row](stateDescriptor)
+
+val tupleSerializer: TypeSerializer[Tuple2[Long, Row]] =
+  (new TupleTypeInfo(BasicTypeInfo.LONG_TYPE_INFO, 
inputType)).createSerializer(
+
getRuntimeContext.getExecutionConfig).asInstanceOf[TypeSerializer[Tuple2[Long, 
Row]]]
+val tupleStateDescriptor: ListStateDescriptor[Tuple2[Long, Row]] =
+  new ListStateDescriptor[Tuple2[Long, Row]]("rowliststate", 
tupleSerializer)
+rowState = getRuntimeContext.getListState[Tuple2[Long, 
Row]](tupleStateDescriptor)
+
+  }
+
+  /**
+* Process one element from the input stream, not emit the output
+*
+* @param value The input value.
+* @param ctx   The ctx to register timer or get current time
+* @param out   The collector for returning result values.
+*
+*/
+  override def processElement(
+ input: Row,
+ ctx:  ProcessFunction[Row, Row]#Context,
+ out: Collector[Row]): Unit = {
+
+// discard later record
+if (ctx.timestamp() >= ctx.timerService().currentWatermark()) {
+  // ensure every key just register on timer
+  
ctx.timerService.registerEventTimeTimer(ctx.timerService.currentWatermark + 1)
+
+  rowState.add(new Tuple2(ctx.timestamp, input))
+}
+  }
+
+  /**
+* Called when a timer set fires, sort current 

[jira] [Commented] (FLINK-5991) Expose Broadcast Operator State through public APIs

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929731#comment-15929731
 ] 

ASF GitHub Bot commented on FLINK-5991:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/3508
  
+1 on not exposing the Java Serialisation shortcut, btw. I was very unhappy 
that we even have it for the normal operator state.  



> Expose Broadcast Operator State through public APIs
> ---
>
> Key: FLINK-5991
> URL: https://issues.apache.org/jira/browse/FLINK-5991
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, State Backends, Checkpointing
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.3.0
>
>
> The broadcast operator state functionality was added in FLINK-5265, it just 
> hasn't been exposed through any public APIs yet.
> Currently, we have 2 streaming connector features for 1.3 that are pending on 
> broadcast state: rescalable Kinesis / Kafka consumers with shard / partition 
> discovery (FLINK-4821 & FLINK-4022). We should consider exposing broadcast 
> state for the 1.3 release also.
> This JIRA also serves the purpose to discuss how we want to expose it.
> To initiate the discussion, I propose:
> 1. For the more powerful {{CheckpointedFunction}}, add the following to the 
> {{OperatorStateStore}} interface:
> {code}
>  ListState getBroadcastOperatorState(ListStateDescriptor 
> stateDescriptor);
>  ListState 
> getBroadcastSerializableListState(String stateName);
> {code}
> 2. For a simpler {{ListCheckpointed}} variant, we probably should have a 
> separate {{BroadcastListCheckpointed}} interface.
> Extending {{ListCheckpointed}} to let the user define either the list state 
> type of either {{PARTITIONABLE}} or {{BROADCAST}} might also be possible, if 
> we can rely on a contract that the value doesn't change. Or we expose a 
> defining method (e.g. {{getListStateType()}}) that is called only once in the 
> operator. This would break user code, but can be considered because it is 
> marked as {{PublicEvolving}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3508: [FLINK-5991] [state-backend, streaming] Expose Broadcast ...

2017-03-17 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/3508
  
+1 on not exposing the Java Serialisation shortcut, btw. I was very unhappy 
that we even have it for the normal operator state. 😃 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3508: [FLINK-5991] [state-backend, streaming] Expose Broadcast ...

2017-03-17 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/3508
  
I would like us to take some time and think about the name. We are about to 
introduce a thing called "broadcast state" somewhat soon in the effort to make 
streaming joins possible. This broadcast state will provide an interface very 
similar to the current keyed state (we'll probably reuse the descriptors and 
state interfaces) but be checkpointed only on one operator because we only 
allow modifications based on broadcast input.

I propose to rename the state we're talking about here to `UnionState` (or 
something) similar because what it does is take the snapshotted state from all 
operators and when restoring sends the union of that to all operators.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5991) Expose Broadcast Operator State through public APIs

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929730#comment-15929730
 ] 

ASF GitHub Bot commented on FLINK-5991:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/3508
  
I would like us to take some time and think about the name. We are about to 
introduce a thing called "broadcast state" somewhat soon in the effort to make 
streaming joins possible. This broadcast state will provide an interface very 
similar to the current keyed state (we'll probably reuse the descriptors and 
state interfaces) but be checkpointed only on one operator because we only 
allow modifications based on broadcast input.

I propose to rename the state we're talking about here to `UnionState` (or 
something) similar because what it does is take the snapshotted state from all 
operators and when restoring sends the union of that to all operators.


> Expose Broadcast Operator State through public APIs
> ---
>
> Key: FLINK-5991
> URL: https://issues.apache.org/jira/browse/FLINK-5991
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, State Backends, Checkpointing
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.3.0
>
>
> The broadcast operator state functionality was added in FLINK-5265, it just 
> hasn't been exposed through any public APIs yet.
> Currently, we have 2 streaming connector features for 1.3 that are pending on 
> broadcast state: rescalable Kinesis / Kafka consumers with shard / partition 
> discovery (FLINK-4821 & FLINK-4022). We should consider exposing broadcast 
> state for the 1.3 release also.
> This JIRA also serves the purpose to discuss how we want to expose it.
> To initiate the discussion, I propose:
> 1. For the more powerful {{CheckpointedFunction}}, add the following to the 
> {{OperatorStateStore}} interface:
> {code}
>  ListState getBroadcastOperatorState(ListStateDescriptor 
> stateDescriptor);
>  ListState 
> getBroadcastSerializableListState(String stateName);
> {code}
> 2. For a simpler {{ListCheckpointed}} variant, we probably should have a 
> separate {{BroadcastListCheckpointed}} interface.
> Extending {{ListCheckpointed}} to let the user define either the list state 
> type of either {{PARTITIONABLE}} or {{BROADCAST}} might also be possible, if 
> we can rely on a contract that the value doesn't change. Or we expose a 
> defining method (e.g. {{getListStateType()}}) that is called only once in the 
> operator. This would break user code, but can be considered because it is 
> marked as {{PublicEvolving}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6101) GroupBy fields with expression can not be selected either using original name or expression

2017-03-17 Thread lincoln.lee (JIRA)
lincoln.lee created FLINK-6101:
--

 Summary: GroupBy fields with expression can not be selected either 
using original name or expression 
 Key: FLINK-6101
 URL: https://issues.apache.org/jira/browse/FLINK-6101
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Reporter: lincoln.lee


currently the TableAPI do not support selecting GroupBy fields with expression 
either using original field name or the expression 

```
 val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 
'e)
.groupBy('e, 'b % 3)
.select('b, 'c.min, 'e, 'a.avg, 'd.count)
```
cause
```
 val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 
'e)
.groupBy('e, 'b % 3)
.select('b, 'c.min, 'e, 'a.avg, 'd.count)
```
and 

```
 val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 
'e)
.groupBy('e, 'b % 3)
.select('b%3, 'c.min, 'e, 'a.avg, 'd.count)
```
will cause

```
org.apache.flink.table.api.ValidationException: Cannot resolve [b] given input 
[e, ('b % 3), TMP_0, TMP_1, TMP_2].
```

and add an alias "group(e, 'b%3 as 'b)" still doesn't work
```
java.lang.IllegalArgumentException: field [b] not found; input fields are: [e, 
b5, TMP_0, TMP_1, TMP_2]
```

the only way to get this work can be 
```
 val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 
'e)
.select('a, 'b%3 as 'b, 'c, 'd, 'e)
.groupBy('e, 'b)
.select('b, 'c.min, 'e, 'a.avg, 'd.count)
```

I'm confused, should we add support alias in groupBy clause? ( it seems a bit 
odd against SQL, but TableAPI has a different groupBy grammar )






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6058) Don't read DEFAULT_PARALLELISM from GlobalConfiguration

2017-03-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15929719#comment-15929719
 ] 

ASF GitHub Bot commented on FLINK-6058:
---

GitHub user fanyon opened a pull request:

https://github.com/apache/flink/pull/3561

[FLINK-6058] fix read DEFAULT_PARALLELISM from ContextEnvironment

fix read DEFAULT_PARALLELISM from ContextEnvironment

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fanyon/flink FLINK-6058

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3561.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3561


commit 446cec49e1aa75c4258c07994d0c5ae18ea0c128
Author: mengji.fy 
Date:   2017-03-17T10:33:34Z

[FLINK-6058] fix read DEFAULT_PARALLELISM from ContextEnvironment




> Don't read DEFAULT_PARALLELISM from GlobalConfiguration
> ---
>
> Key: FLINK-6058
> URL: https://issues.apache.org/jira/browse/FLINK-6058
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.3.0
>
>
> In the constructor of {{StreamContextEnvironment}} we read the 
> {{DEFAULT_PARALLELISM}} from the {{GlobalConfiguration}}. This assumes that 
> the environment variables are correctly set and can lead to problems. We 
> should read the default parallelism in the client and set it in the 
> {{ContextEnvironment}} that it creates. This can then be read by the 
> {{StreamContextEnvironment}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3561: [FLINK-6058] fix read DEFAULT_PARALLELISM from Con...

2017-03-17 Thread fanyon
GitHub user fanyon opened a pull request:

https://github.com/apache/flink/pull/3561

[FLINK-6058] fix read DEFAULT_PARALLELISM from ContextEnvironment

fix read DEFAULT_PARALLELISM from ContextEnvironment

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fanyon/flink FLINK-6058

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3561.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3561


commit 446cec49e1aa75c4258c07994d0c5ae18ea0c128
Author: mengji.fy 
Date:   2017-03-17T10:33:34Z

[FLINK-6058] fix read DEFAULT_PARALLELISM from ContextEnvironment




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


<    1   2   3   4   5   >