[jira] [Commented] (FLINK-5095) Add explicit notifyOfAddedX methods to MetricReporter interface

2017-04-20 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978103#comment-15978103
 ] 

Chesnay Schepler commented on FLINK-5095:
-

It's less maintainable. By introducing a set of generic primitives we can add 
more metric types without having to change all reporters. Take a timer for 
example. If we introduced that now not a single existing reporter could report 
it and would have to be updated. In contrast, if the timer would just be 
another generic "NumberMetric" they would be instantly supported.

Furthermore, we would no longer have the generic Gauge type which is just a 
pain to deal with; if you can't deal with non-numeric gauges you cannot just 
ignore them in notify; after all a metric might return null and you can't check 
the type of that. You can only do that (somewhat reliably) in report().

> Add explicit notifyOfAddedX methods to MetricReporter interface
> ---
>
> Key: FLINK-5095
> URL: https://issues.apache.org/jira/browse/FLINK-5095
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.1.3
>Reporter: Chesnay Schepler
>Priority: Minor
>
> I would like to start a discussion on the MetricReporter interface, 
> specifically the methods that notify a reporter of added or removed metrics.
> Currently, the methods are defined as follows:
> {code}
> void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);
> void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup 
> group);
> {code}
> All metrics, regardless of their actual type, are passed to the reporter with 
> these methods.
> Since the different metric types have to be handled differently we thus force 
> every reporter to do something like this:
> {code}
> if (metric instanceof Counter) {
> Counter c = (Counter) metric;
>   // deal with counter
> } else if (metric instanceof Gauge) {
>   // deal with gauge
> } else if (metric instanceof Histogram) {
>   // deal with histogram
> } else if (metric instanceof Meter) {
>   // deal with meter
> } else {
>   // log something or throw an exception
> }
> {code}
> This has a few issues
> * the instanceof checks and castings are unnecessary overhead
> * it requires the implementer to be aware of every metric type
> * it encourages throwing an exception in the final else block
> We could remedy all of these by reworking the interface to contain explicit 
> add/remove methods for every metric type. This would however be a breaking 
> change and blow up the interface to 12 methods from the current 4. We could 
> also add a RichMetricReporter interface with these methods, which would 
> require relatively little changes but add additional complexity.
> I was wondering what other people think about this.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-6351) Refactoring YarnFlinkApplicationMasterRunner by combining AbstractYarnFlinkApplicationMasterRunner in one class

2017-04-20 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977998#comment-15977998
 ] 

mingleizhang edited comment on FLINK-6351 at 4/21/17 3:10 AM:
--

[~till.rohrmann] Hi, Till. Please check it out and how do you think of this ? 
If there is no more question, I will work on this soon. Thanks go out to you.


was (Author: mingleizhang):
[~till.rohrmann] Hi, Till. Please check it out and how do you think of this ? 
Thanks go out to you.

> Refactoring YarnFlinkApplicationMasterRunner by combining 
> AbstractYarnFlinkApplicationMasterRunner in one class
> ---
>
> Key: FLINK-6351
> URL: https://issues.apache.org/jira/browse/FLINK-6351
> Project: Flink
>  Issue Type: Improvement
>  Components: YARN
>Reporter: mingleizhang
>Assignee: mingleizhang
>
> The currently YarnFlinkApplicationMasterRunner inherits from 
> AbstractYarnFlinkApplicationMasterRunner and seems a bit unnecessary. We 
> could conbine two classes and then instantiate the services and runtime 
> components in the constructor of YarnFlinkApplicationMasterRunner for geting 
> rid of the lock in run method.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6351) Refactoring YarnFlinkApplicationMasterRunner by combining AbstractYarnFlinkApplicationMasterRunner in one class

2017-04-20 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977998#comment-15977998
 ] 

mingleizhang commented on FLINK-6351:
-

[~till.rohrmann] Hi, Till. Please check it out and how do you think of this ? 
Thanks go out to you.

> Refactoring YarnFlinkApplicationMasterRunner by combining 
> AbstractYarnFlinkApplicationMasterRunner in one class
> ---
>
> Key: FLINK-6351
> URL: https://issues.apache.org/jira/browse/FLINK-6351
> Project: Flink
>  Issue Type: Improvement
>  Components: YARN
>Reporter: mingleizhang
>Assignee: mingleizhang
>
> The currently YarnFlinkApplicationMasterRunner inherits from 
> AbstractYarnFlinkApplicationMasterRunner and seems a bit unnecessary. We 
> could conbine two classes and then instantiate the services and runtime 
> components in the constructor of YarnFlinkApplicationMasterRunner for geting 
> rid of the lock in run method.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6351) Refactoring YarnFlinkApplicationMasterRunner by combining AbstractYarnFlinkApplicationMasterRunner in one class

2017-04-20 Thread mingleizhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang updated FLINK-6351:

Description: The currently YarnFlinkApplicationMasterRunner inherits from 
AbstractYarnFlinkApplicationMasterRunner and seems a bit unnecessary. We could 
conbine two classes and then instantiate the services and runtime components in 
the constructor of YarnFlinkApplicationMasterRunner for geting rid of the lock 
in run method.

> Refactoring YarnFlinkApplicationMasterRunner by combining 
> AbstractYarnFlinkApplicationMasterRunner in one class
> ---
>
> Key: FLINK-6351
> URL: https://issues.apache.org/jira/browse/FLINK-6351
> Project: Flink
>  Issue Type: Improvement
>  Components: YARN
>Reporter: mingleizhang
>Assignee: mingleizhang
>
> The currently YarnFlinkApplicationMasterRunner inherits from 
> AbstractYarnFlinkApplicationMasterRunner and seems a bit unnecessary. We 
> could conbine two classes and then instantiate the services and runtime 
> components in the constructor of YarnFlinkApplicationMasterRunner for geting 
> rid of the lock in run method.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6351) Refactoring YarnFlinkApplicationMasterRunner by combining AbstractYarnFlinkApplicationMasterRunner in one class

2017-04-20 Thread mingleizhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang updated FLINK-6351:

Priority: Major  (was: Minor)

> Refactoring YarnFlinkApplicationMasterRunner by combining 
> AbstractYarnFlinkApplicationMasterRunner in one class
> ---
>
> Key: FLINK-6351
> URL: https://issues.apache.org/jira/browse/FLINK-6351
> Project: Flink
>  Issue Type: Improvement
>  Components: YARN
>Reporter: mingleizhang
>Assignee: mingleizhang
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6351) Refactoring YarnFlinkApplicationMasterRunner by combining AbstractYarnFlinkApplicationMasterRunner in one class

2017-04-20 Thread mingleizhang (JIRA)
mingleizhang created FLINK-6351:
---

 Summary: Refactoring YarnFlinkApplicationMasterRunner by combining 
AbstractYarnFlinkApplicationMasterRunner in one class
 Key: FLINK-6351
 URL: https://issues.apache.org/jira/browse/FLINK-6351
 Project: Flink
  Issue Type: Improvement
  Components: YARN
Reporter: mingleizhang
Assignee: mingleizhang
Priority: Minor






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6350) Flink: Windowing does not work with streams from collections and the local execution environment

2017-04-20 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-6350:

Component/s: Streaming

> Flink: Windowing does not work with streams from collections and the local 
> execution environment
> 
>
> Key: FLINK-6350
> URL: https://issues.apache.org/jira/browse/FLINK-6350
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.2.0
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.2.2
>
>
> When using events via the {{fromCollection}} method of 
> {{StreamExecutionEnvironment}}, window timing is not supported. The time 
> windows close immediately.  
> This is unfortunate because mocking events from collections and testing them 
> locally is a powerful way to unit test stream processors.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6350) Flink: Windowing does not work with streams from collections and the local execution environment

2017-04-20 Thread Bowen Li (JIRA)
Bowen Li created FLINK-6350:
---

 Summary: Flink: Windowing does not work with streams from 
collections and the local execution environment
 Key: FLINK-6350
 URL: https://issues.apache.org/jira/browse/FLINK-6350
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.2.0
Reporter: Bowen Li
Assignee: Bowen Li
 Fix For: 1.2.2


When using events via the {{fromCollection}} method of 
{{StreamExecutionEnvironment}}, window timing is not supported. The time 
windows close immediately.  

This is unfortunate because mocking events from collections and testing them 
locally is a powerful way to unit test stream processors.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-5095) Add explicit notifyOfAddedX methods to MetricReporter interface

2017-04-20 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977893#comment-15977893
 ] 

Bowen Li edited comment on FLINK-5095 at 4/21/17 1:02 AM:
--

What's the main disadvantage of having counter, meters, and gauge handled 
separately besides the increased number of API? 


was (Author: phoenixjiangnan):
What's the main disadvantage of having counter, meters, and gauge separately 
besides the increased number of API? 

> Add explicit notifyOfAddedX methods to MetricReporter interface
> ---
>
> Key: FLINK-5095
> URL: https://issues.apache.org/jira/browse/FLINK-5095
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.1.3
>Reporter: Chesnay Schepler
>Priority: Minor
>
> I would like to start a discussion on the MetricReporter interface, 
> specifically the methods that notify a reporter of added or removed metrics.
> Currently, the methods are defined as follows:
> {code}
> void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);
> void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup 
> group);
> {code}
> All metrics, regardless of their actual type, are passed to the reporter with 
> these methods.
> Since the different metric types have to be handled differently we thus force 
> every reporter to do something like this:
> {code}
> if (metric instanceof Counter) {
> Counter c = (Counter) metric;
>   // deal with counter
> } else if (metric instanceof Gauge) {
>   // deal with gauge
> } else if (metric instanceof Histogram) {
>   // deal with histogram
> } else if (metric instanceof Meter) {
>   // deal with meter
> } else {
>   // log something or throw an exception
> }
> {code}
> This has a few issues
> * the instanceof checks and castings are unnecessary overhead
> * it requires the implementer to be aware of every metric type
> * it encourages throwing an exception in the final else block
> We could remedy all of these by reworking the interface to contain explicit 
> add/remove methods for every metric type. This would however be a breaking 
> change and blow up the interface to 12 methods from the current 4. We could 
> also add a RichMetricReporter interface with these methods, which would 
> require relatively little changes but add additional complexity.
> I was wondering what other people think about this.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5095) Add explicit notifyOfAddedX methods to MetricReporter interface

2017-04-20 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977893#comment-15977893
 ] 

Bowen Li commented on FLINK-5095:
-

What's the main disadvantage of having counter, meters, and gauge separately 
besides the increased number of API? 

> Add explicit notifyOfAddedX methods to MetricReporter interface
> ---
>
> Key: FLINK-5095
> URL: https://issues.apache.org/jira/browse/FLINK-5095
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.1.3
>Reporter: Chesnay Schepler
>Priority: Minor
>
> I would like to start a discussion on the MetricReporter interface, 
> specifically the methods that notify a reporter of added or removed metrics.
> Currently, the methods are defined as follows:
> {code}
> void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);
> void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup 
> group);
> {code}
> All metrics, regardless of their actual type, are passed to the reporter with 
> these methods.
> Since the different metric types have to be handled differently we thus force 
> every reporter to do something like this:
> {code}
> if (metric instanceof Counter) {
> Counter c = (Counter) metric;
>   // deal with counter
> } else if (metric instanceof Gauge) {
>   // deal with gauge
> } else if (metric instanceof Histogram) {
>   // deal with histogram
> } else if (metric instanceof Meter) {
>   // deal with meter
> } else {
>   // log something or throw an exception
> }
> {code}
> This has a few issues
> * the instanceof checks and castings are unnecessary overhead
> * it requires the implementer to be aware of every metric type
> * it encourages throwing an exception in the final else block
> We could remedy all of these by reworking the interface to contain explicit 
> add/remove methods for every metric type. This would however be a breaking 
> change and blow up the interface to 12 methods from the current 4. We could 
> also add a RichMetricReporter interface with these methods, which would 
> require relatively little changes but add additional complexity.
> I was wondering what other people think about this.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6309) Memory consumer weights should be calculated in job vertex level

2017-04-20 Thread Xu Pingyong (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977892#comment-15977892
 ] 

Xu Pingyong commented on FLINK-6309:


Memory consumer weights should be considered not only in terms of chaining , 
but also in slot sharing.

> Memory consumer weights should be calculated in job vertex level
> 
>
> Key: FLINK-6309
> URL: https://issues.apache.org/jira/browse/FLINK-6309
> Project: Flink
>  Issue Type: Improvement
>  Components: Optimizer
>Reporter: Kurt Young
>Assignee: Xu Pingyong
>
> Currently in {{PlanFinalizer}}, we travel the whole job vertexes to calculate 
> the memory consumer weights, and then assign the weights for each job vertex. 
> In a case of a large job graph, e.g. with multiple joins, group reduces, the 
> consumer weights will be high and the usable memory for each job vertex will 
> be very low. 
> I think it makes more sense to calculate the memory consumer weights in job 
> vertex level (after chaining) to maximize the memory utility.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977891#comment-15977891
 ] 

ASF GitHub Bot commented on FLINK-6225:
---

GitHub user PangZhi opened a pull request:

https://github.com/apache/flink/pull/3748

[FLINK-6225] [Cassandra Connector] add CassandraTableSink

This PR is to address https://issues.apache.org/jira/browse/FLINK-6225

- add Row type support for CassandraSink
- add StreamTableSink implementation for Cassandra


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/PangZhi/flink 
FLINK-6225_add_cassandra_table_sink

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3748.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3748


commit 8bd2816a5d5999575bd98b5d3096af5f36f6f173
Author: Jing Fan 
Date:   2017-04-21T00:54:36Z

[FLINK-6225] [Cassandra Connector] add CassandraTableSink




> Support Row Stream for CassandraSink
> 
>
> Key: FLINK-6225
> URL: https://issues.apache.org/jira/browse/FLINK-6225
> Project: Flink
>  Issue Type: New Feature
>  Components: Cassandra Connector
>Affects Versions: 1.3.0
>Reporter: Jing Fan
>Assignee: Haohui Mai
> Fix For: 1.3.0
>
>
> Currently in CassandraSink, specifying query is not supported for row-stream. 
> The solution should be similar to CassandraTupleSink.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...

2017-04-20 Thread PangZhi
GitHub user PangZhi opened a pull request:

https://github.com/apache/flink/pull/3748

[FLINK-6225] [Cassandra Connector] add CassandraTableSink

This PR is to address https://issues.apache.org/jira/browse/FLINK-6225

- add Row type support for CassandraSink
- add StreamTableSink implementation for Cassandra


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/PangZhi/flink 
FLINK-6225_add_cassandra_table_sink

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3748.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3748


commit 8bd2816a5d5999575bd98b5d3096af5f36f6f173
Author: Jing Fan 
Date:   2017-04-21T00:54:36Z

[FLINK-6225] [Cassandra Connector] add CassandraTableSink




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-6349) Enforce per-subtask record ordering on resharding for FlinkKinesisConsumer

2017-04-20 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-6349:
--

 Summary: Enforce per-subtask record ordering on resharding for 
FlinkKinesisConsumer
 Key: FLINK-6349
 URL: https://issues.apache.org/jira/browse/FLINK-6349
 Project: Flink
  Issue Type: Improvement
  Components: Kinesis Connector
Reporter: Tzu-Li (Gordon) Tai


As described in FLINK-6316, currently the Kinesis consumer does not provide any 
ordering guarantees when resharding occurs.

While this cannot be enforced globally (i.e. if a merged / split shard's child 
shard ends up in a different subtask, we cannot do any coordination for 
ordering guarantee), we can definitely enforce this locally for each subtask. 
Simply put, we can still locally enforce ordering by making sure that 
discovered child shards are consumed only after any of its parent shards that 
were on the same subtask are fully consumed.

To do this, we would also need to add "parent shard" information to 
{{KinesisStreamShard}} (Flink's representation of Kinesis shards).

This would be directly beneficial for per-shard watermarks (FLINK-5697) to 
retain per-shard time characteristics after a reshard, and therefore can be 
seen as a prerequisite.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6348) Migrate from Java serialization for RollingSink's state

2017-04-20 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-6348:
--

 Summary: Migrate from Java serialization for RollingSink's state
 Key: FLINK-6348
 URL: https://issues.apache.org/jira/browse/FLINK-6348
 Project: Flink
  Issue Type: Sub-task
Reporter: Tzu-Li (Gordon) Tai


See umbrella JIRA FLINK-6343 for details. This subtask tracks the migration for 
{{RollingSink}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6347) Migrate from Java serialization for MessageAcknowledgingSourceBase's state

2017-04-20 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-6347:
--

 Summary: Migrate from Java serialization for 
MessageAcknowledgingSourceBase's state
 Key: FLINK-6347
 URL: https://issues.apache.org/jira/browse/FLINK-6347
 Project: Flink
  Issue Type: Sub-task
  Components: Streaming Connectors
Reporter: Tzu-Li (Gordon) Tai


See umbrella JIRA FLINK-6343 for details. This subtask tracks the migration for 
{{MessageAcknowledgingSourceBase}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6346) Migrate from Java serialization for GenericWriteAheadSink's state

2017-04-20 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-6346:
--

 Summary: Migrate from Java serialization for 
GenericWriteAheadSink's state
 Key: FLINK-6346
 URL: https://issues.apache.org/jira/browse/FLINK-6346
 Project: Flink
  Issue Type: Sub-task
  Components: Streaming Connectors
Reporter: Tzu-Li (Gordon) Tai


See umbrella JIRA FLINK-6343 for details. This subtask tracks the migration for 
{{GenericWriteAheadSink}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6345) Migrate from Java serialization for ContinuousFileReaderOperator's state

2017-04-20 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-6345:
--

 Summary: Migrate from Java serialization for 
ContinuousFileReaderOperator's state
 Key: FLINK-6345
 URL: https://issues.apache.org/jira/browse/FLINK-6345
 Project: Flink
  Issue Type: Sub-task
  Components: Streaming Connectors
Reporter: Tzu-Li (Gordon) Tai


See umbrella JIRA FLINK-6343 for details. This subtask tracks the migration for 
{{ContinuousFileReaderOperator}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6344) Migrate from Java serialization for BucketingSink's state

2017-04-20 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-6344:
---
Summary: Migrate from Java serialization for BucketingSink's state  (was: 
Migrate from Java serialization for `BucketingSink`'s state)

> Migrate from Java serialization for BucketingSink's state
> -
>
> Key: FLINK-6344
> URL: https://issues.apache.org/jira/browse/FLINK-6344
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>
> See umbrella JIRA FLINK-6343 for details. This subtask tracks the migration 
> for `BucketingSink`.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6344) Migrate from Java serialization for `BucketingSink`'s state

2017-04-20 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-6344:
--

 Summary: Migrate from Java serialization for `BucketingSink`'s 
state
 Key: FLINK-6344
 URL: https://issues.apache.org/jira/browse/FLINK-6344
 Project: Flink
  Issue Type: Sub-task
  Components: Streaming Connectors
Reporter: Tzu-Li (Gordon) Tai


See umbrella JIRA FLINK-6343 for details. This subtask tracks the migration for 
`BucketingSink`.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6343) Migrate from discouraged Java serialization for all sources / sinks

2017-04-20 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-6343:
---
Description: 
With FLINK-6324, the Java serialization shortcut for operator state is now 
deprecated to discourage its usage.

These sources / sinks are still using this shortcut, and should be migrated to 
use {{getListState(descriptor)}} instead:
- {{BucketingSink}}
- {{ContinuousFileReaderOperator}}
- {{GenericWriteAheadSink}}
- {{MessageAcknowledgingSourceBase}}
- {{RollingSink}}
- {{FlinkKafkaConsumerBase}} (will be fixed along with the state migration 
included with FLINK-4022)

To ease review, I propose to open up subtasks under this JIRA and separate PRs 
for the migration of each single source / sink.

  was:
With FLINK-6324, the Java serialization shortcut for operator state is now 
deprecated to discourage its usage.

These sources / sinks are still using this shortcut, and should be migrated to 
use `getListState(descriptor)` instead:
- {{BucketingSink}}
- {{ContinuousFileReaderOperator}}
- {{GenericWriteAheadSink}}
- {{MessageAcknowledgingSourceBase}}
- {{RollingSink}}
- {{FlinkKafkaConsumerBase}} (will be fixed along with the state migration 
included with FLINK-4022)

To ease review, I propose to open up subtasks under this JIRA and separate PRs 
for the migration of each single source / sink.


> Migrate from discouraged Java serialization for all sources / sinks
> ---
>
> Key: FLINK-6343
> URL: https://issues.apache.org/jira/browse/FLINK-6343
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>
> With FLINK-6324, the Java serialization shortcut for operator state is now 
> deprecated to discourage its usage.
> These sources / sinks are still using this shortcut, and should be migrated 
> to use {{getListState(descriptor)}} instead:
> - {{BucketingSink}}
> - {{ContinuousFileReaderOperator}}
> - {{GenericWriteAheadSink}}
> - {{MessageAcknowledgingSourceBase}}
> - {{RollingSink}}
> - {{FlinkKafkaConsumerBase}} (will be fixed along with the state migration 
> included with FLINK-4022)
> To ease review, I propose to open up subtasks under this JIRA and separate 
> PRs for the migration of each single source / sink.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-6343) Migrate from discouraged Java serialization for all sources / sinks

2017-04-20 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-6343:
---
Description: 
With FLINK-6324, the Java serialization shortcut for operator state is now 
deprecated to discourage its usage.

These sources / sinks are still using this shortcut, and should be migrated to 
use `getListState(descriptor)` instead:
- {{BucketingSink}}
- {{ContinuousFileReaderOperator}}
- {{GenericWriteAheadSink}}
- {{MessageAcknowledgingSourceBase}}
- {{RollingSink}}
- {{FlinkKafkaConsumerBase}} (will be fixed along with the state migration 
included with FLINK-4022)

To ease review, I propose to open up subtasks under this JIRA and separate PRs 
for the migration of each single source / sink.

  was:
With FLINK-6324, the Java serialization shortcut for operator state is now 
deprecated to discourage its usage.

These sources / sinks are still using this shortcut, and should be migrated to 
use `getListState(descriptor)` instead:
- `BucketingSink`
- `ContinuousFileReaderOperator`
- `GenericWriteAheadSink`
- `MessageAcknowledgingSourceBase`
- `RollingSink`
- `FlinkKafkaConsumerBase` (will be fixed along with the state migration 
included with FLINK-4022)

To ease review, I propose to open up subtasks under this JIRA and separate PRs 
for the migration of each single source / sink.


> Migrate from discouraged Java serialization for all sources / sinks
> ---
>
> Key: FLINK-6343
> URL: https://issues.apache.org/jira/browse/FLINK-6343
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>
> With FLINK-6324, the Java serialization shortcut for operator state is now 
> deprecated to discourage its usage.
> These sources / sinks are still using this shortcut, and should be migrated 
> to use `getListState(descriptor)` instead:
> - {{BucketingSink}}
> - {{ContinuousFileReaderOperator}}
> - {{GenericWriteAheadSink}}
> - {{MessageAcknowledgingSourceBase}}
> - {{RollingSink}}
> - {{FlinkKafkaConsumerBase}} (will be fixed along with the state migration 
> included with FLINK-4022)
> To ease review, I propose to open up subtasks under this JIRA and separate 
> PRs for the migration of each single source / sink.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6343) Migrate from discouraged Java serialization for all sources / sinks

2017-04-20 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-6343:
--

 Summary: Migrate from discouraged Java serialization for all 
sources / sinks
 Key: FLINK-6343
 URL: https://issues.apache.org/jira/browse/FLINK-6343
 Project: Flink
  Issue Type: Improvement
  Components: Streaming Connectors
Reporter: Tzu-Li (Gordon) Tai


With FLINK-6324, the Java serialization shortcut for operator state is now 
deprecated to discourage its usage.

These sources / sinks are still using this shortcut, and should be migrated to 
use `getListState(descriptor)` instead:
- `BucketingSink`
- `ContinuousFileReaderOperator`
- `GenericWriteAheadSink`
- `MessageAcknowledgingSourceBase`
- `RollingSink`
- `FlinkKafkaConsumerBase` (will be fixed along with the state migration 
included with FLINK-4022)

To ease review, I propose to open up subtasks under this JIRA and separate PRs 
for the migration of each single source / sink.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run

2017-04-20 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai closed FLINK-6311.
--
   Resolution: Fixed
Fix Version/s: 1.2.2
   1.3.0

Resolved for master via 
http://git-wip-us.apache.org/repos/asf/flink/commit/a0249d9.

Resolved for release-1.2 via 
http://git-wip-us.apache.org/repos/asf/flink/commit/80dc704.

> NPE in FlinkKinesisConsumer if source was closed before run
> ---
>
> Key: FLINK-6311
> URL: https://issues.apache.org/jira/browse/FLINK-6311
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: mingleizhang
> Fix For: 1.3.0, 1.2.2
>
>
> This was reported by an user: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html
> The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected 
> against the condition when the source was closed before it started running. 
> Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977768#comment-15977768
 ] 

ASF GitHub Bot commented on FLINK-6311:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3738


> NPE in FlinkKinesisConsumer if source was closed before run
> ---
>
> Key: FLINK-6311
> URL: https://issues.apache.org/jira/browse/FLINK-6311
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: mingleizhang
>
> This was reported by an user: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html
> The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected 
> against the condition when the source was closed before it started running. 
> Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3738: [FLINK-6311] [Kinesis Connector] NPE in FlinkKines...

2017-04-20 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3738


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6013) Add Datadog HTTP metrics reporter

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977755#comment-15977755
 ] 

ASF GitHub Bot commented on FLINK-6013:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/3736
  
Green build! @zentol 


> Add Datadog HTTP metrics reporter
> -
>
> Key: FLINK-6013
> URL: https://issues.apache.org/jira/browse/FLINK-6013
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.3.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Critical
> Fix For: 1.3.0
>
>
> We at OfferUp use Datadog a lot for metrics and dashboards, and I believe a 
> lot other companies also do.
> Flink right now only has a StatsD metrics reporter, and users have to set up 
> Datadog Agent in order to receive metrics from StatsD and transport them to 
> Datadog. We don't like this approach.
> We prefer to have a Datadog metrics reporter directly contacting Datadog http 
> endpoint.
> I'll take this ticket myself.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3736: [FLINK-6013][metrics] Add Datadog HTTP metrics reporter

2017-04-20 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/3736
  
Green build! @zentol 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977669#comment-15977669
 ] 

ASF GitHub Bot commented on FLINK-6228:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112488004
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
 ---
@@ -35,6 +37,18 @@ abstract sealed class Aggregation extends 
UnaryExpression {
 * Convert Aggregate to its counterpart in Calcite, i.e. AggCall
 */
   private[flink] def toAggCall(name: String)(implicit relBuilder: 
RelBuilder): AggCall
+
+  /**
+* Because SqlAggFunction from Calcite's AggCallImpl is invisible,
--- End diff --

Thanks for the explanation for this method.
Can you update the doc to `Returns the SqlAggFunction for this Aggregation` 
and change the method name to `getSqlAggFunction()`? The method does not 
convert the Aggregation because the returned SqlAggFunction does not contain 
the arguments.


> Integrating the OVER windows in the Table API
> -
>
> Key: FLINK-6228
> URL: https://issues.apache.org/jira/browse/FLINK-6228
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Syntax:
> {code}
> table
>.overWindows(
> (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy 
> order_by_expression] 
>   (preceding  
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW)
>  [following 
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW]
> as alias,...[n])
>)
>   .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n])
> {code}
> Implement restrictions:
> * All OVER clauses in the same SELECT clause must be exactly the same.
> * The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> * The ORDER BY Before the 
> [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884]  implementation 
> orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for 
> batch).
> * FOLLOWING is not supported.
> User interface design document [See | 
> https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977648#comment-15977648
 ] 

ASF GitHub Bot commented on FLINK-6228:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112456485
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ---
@@ -810,6 +810,47 @@ class Table(
 new WindowedTable(this, window)
   }
 
+  /**
+* Groups the records of a table by assigning them to windows defined 
by a time or row interval.
+*
+* For streaming tables of infinite size, grouping into windows is 
required to define finite
+* groups on which over-based aggregates can be computed.
+*
+* Over window for batch tables are currently not supported.
+*
+* @param overWindows windows that specifies how elements are grouped.
+* @return Over windowed table
+*/
+  def window(overWindows: OverWindow*): OverWindowedTable = {
+
+if (tableEnv.isInstanceOf[BatchTableEnvironment]) {
+  throw TableException("Over window for batch tables are currently not 
supported.")
+} else {
+  overWindows.foreach { overWindow =>
+val orderName = 
overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
+if (!orderName.equalsIgnoreCase("rowtime")
+  && !orderName.equalsIgnoreCase("proctime")) {
+  throw ValidationException(
+s"OrderBy expression must be ['rowtime] or ['proctime], but 
got ['${orderName}]")
+}
+  }
+}
+
+if (overWindows.size != 1) {
+  throw TableException("OverWindow only supported single window at 
current time.")
+}
+
+overWindows.foreach { overWindow =>
+  if (!overWindow.preceding.asInstanceOf[Literal].resultType.getClass
--- End diff --

This check should be done in `OverCall.validateInput()` as well.


> Integrating the OVER windows in the Table API
> -
>
> Key: FLINK-6228
> URL: https://issues.apache.org/jira/browse/FLINK-6228
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Syntax:
> {code}
> table
>.overWindows(
> (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy 
> order_by_expression] 
>   (preceding  
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW)
>  [following 
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW]
> as alias,...[n])
>)
>   .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n])
> {code}
> Implement restrictions:
> * All OVER clauses in the same SELECT clause must be exactly the same.
> * The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> * The ORDER BY Before the 
> [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884]  implementation 
> orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for 
> batch).
> * FOLLOWING is not supported.
> User interface design document [See | 
> https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977681#comment-15977681
 ] 

ASF GitHub Bot commented on FLINK-6228:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112570095
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
 ---
@@ -49,6 +57,128 @@ case class Call(functionName: String, args: 
Seq[Expression]) extends Expression
 }
 
 /**
+  * Over expression for calcite over transform.
+  *
+  * @param agg over-agg expression
+  * @param aggAliasagg alias for following `select()` clause.
+  * @param overWindowAlias over window alias
+  * @param overWindow  over window
+  */
+case class OverCall(
+agg: Aggregation,
+var aggAlias: Expression,
+overWindowAlias: Expression,
+var overWindow: OverWindow = null) extends Expression {
+
+  private[flink] def as(aggAlias: Expression): OverCall = {
+this.aggAlias = aggAlias
+this
+  }
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+
+val rexBuilder = relBuilder.getRexBuilder
+
+val operator: SqlAggFunction = agg.toSqlAggFunction()
+
+val aggReturnType: TypeInformation[_] = agg.resultType
+
+val relDataType = 
SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType)
+
+val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
+val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name
+
+aggExprs.add(relBuilder.field(aggChildName))
+
+val orderKeys: ImmutableList.Builder[RexFieldCollation] =
+  new ImmutableList.Builder[RexFieldCollation]()
+
+val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]()
+val orderName = 
overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
+
+val rexNode =
+  if (orderName.equalsIgnoreCase("rowtime") || 
orderName.equalsIgnoreCase("proctime")) {
+// for stream
+relBuilder.literal(orderName)
+  } else {
+// for batch
+relBuilder.field(orderName)
+  }
+
+orderKeys.add(new RexFieldCollation(rexNode,sets))
+
+val partitionKeys: util.ArrayList[RexNode] = new 
util.ArrayList[RexNode]()
+overWindow.partitionBy.foreach(x=>
+  
partitionKeys.add(relBuilder.field(x.asInstanceOf[UnresolvedFieldReference].name)))
+
+val preceding = overWindow.preceding.asInstanceOf[Literal]
+val following = overWindow.following.asInstanceOf[Literal]
+
+val isPhysical: Boolean = 
preceding.resultType.isInstanceOf[RowIntervalTypeInfo]
+
+val lowerBound = createBound(relBuilder, 
preceding.value.asInstanceOf[Long], SqlKind.PRECEDING)
+val upperBound = createBound(relBuilder, 
following.value.asInstanceOf[Long], SqlKind.FOLLOWING)
+
+rexBuilder.makeOver(
+  relDataType,
+  operator,
+  aggExprs,
+  partitionKeys,
+  orderKeys.build,
+  lowerBound,
+  upperBound,
+  isPhysical,
+  true,
+  false)
+  }
+
+  private def createBound(
+relBuilder: RelBuilder,
+precedingValue: Long,
+sqlKind: SqlKind): RexWindowBound = {
+
+if (precedingValue == Long.MaxValue) {
+  val unbounded = SqlWindow.createUnboundedPreceding(SqlParserPos.ZERO)
+  create(unbounded, null)
+} else if (precedingValue == 0L) {
+  val currentRow = SqlWindow.createCurrentRow(SqlParserPos.ZERO)
+  create(currentRow, null)
+} else {
+
+  val returnType = new BasicSqlType(
+relBuilder.getTypeFactory.getTypeSystem,
+SqlTypeName.DECIMAL)
+
+  val sqlOperator = new SqlPostfixOperator(
+sqlKind.name,
+sqlKind,
+2,
+new OrdinalReturnTypeInference(0),
+null,
+null)
+
+  val operands: Array[SqlNode] = new Array[SqlNode](1)
+  operands(0) = (SqlLiteral.createExactNumeric("1", SqlParserPos.ZERO))
+
+  val node = new SqlBasicCall(sqlOperator, operands, SqlParserPos.ZERO)
+
+  val expressions: util.ArrayList[RexNode] = new 
util.ArrayList[RexNode]()
+  expressions.add(relBuilder.literal(precedingValue))
+
+  val rexNode = relBuilder.getRexBuilder.makeCall(returnType, 
sqlOperator, expressions)
+
+  create(node, rexNode)
+}
+  }
+
+  override private[flink] def children: Seq[Expression] = Seq()
--- End diff --

maybe we can check the partitionBy 

[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977653#comment-15977653
 ] 

ASF GitHub Bot commented on FLINK-6228:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112482490
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala
 ---
@@ -22,6 +22,126 @@ import org.apache.flink.table.expressions.{Expression, 
ExpressionParser}
 import org.apache.flink.table.plan.logical._
 
 /**
+  * An over window specification.
+  *
+  * Over window is similar to the traditional OVER SQL.
+  */
+class OverWindow {
+
+  private[flink] var alias: Expression = _
+  private[flink] var partitionBy: Seq[Expression] = Seq[Expression]()
+  private[flink] var orderBy: Expression = _
+  private[flink] var preceding: Expression = _
+  private[flink] var following: Expression = null
+
+  /**
+* Assigns an alias for this window that the following `select()` 
clause can refer to.
+*
+* @param alias alias for this over window
+* @return this over window
+*/
+  def as(alias: String): OverWindow = 
as(ExpressionParser.parseExpression(alias))
+
+  /**
+* Assigns an alias for this window that the following `select()` 
clause can refer to.
+*
+* @param alias alias for this over window
+* @return this over window
+*/
+  def as(alias: Expression): OverWindow = {
--- End diff --

I think it would be good if we could enforce the alias. 
This could be done by first build a window which is not of type 
`OverWindow` and letting `as()` return the complete `OverWindow`


> Integrating the OVER windows in the Table API
> -
>
> Key: FLINK-6228
> URL: https://issues.apache.org/jira/browse/FLINK-6228
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Syntax:
> {code}
> table
>.overWindows(
> (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy 
> order_by_expression] 
>   (preceding  
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW)
>  [following 
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW]
> as alias,...[n])
>)
>   .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n])
> {code}
> Implement restrictions:
> * All OVER clauses in the same SELECT clause must be exactly the same.
> * The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> * The ORDER BY Before the 
> [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884]  implementation 
> orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for 
> batch).
> * FOLLOWING is not supported.
> User interface design document [See | 
> https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977667#comment-15977667
 ] 

ASF GitHub Bot commented on FLINK-6228:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112501222
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/SqlTypeUtils.scala
 ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.typeutils
+
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.Types._
+import org.apache.flink.table.api.TableException
+
+object SqlTypeUtils {
--- End diff --

Util class can be removed. We can create a `RelDataType` from a 
`TypeInformation` as follows:

```

relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory].createTypeFromTypeInfo(typeInfo)
```


> Integrating the OVER windows in the Table API
> -
>
> Key: FLINK-6228
> URL: https://issues.apache.org/jira/browse/FLINK-6228
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Syntax:
> {code}
> table
>.overWindows(
> (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy 
> order_by_expression] 
>   (preceding  
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW)
>  [following 
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW]
> as alias,...[n])
>)
>   .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n])
> {code}
> Implement restrictions:
> * All OVER clauses in the same SELECT clause must be exactly the same.
> * The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> * The ORDER BY Before the 
> [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884]  implementation 
> orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for 
> batch).
> * FOLLOWING is not supported.
> User interface design document [See | 
> https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977678#comment-15977678
 ] 

ASF GitHub Bot commented on FLINK-6228:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112561580
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
 ---
@@ -49,6 +57,128 @@ case class Call(functionName: String, args: 
Seq[Expression]) extends Expression
 }
 
 /**
+  * Over expression for calcite over transform.
+  *
+  * @param agg over-agg expression
+  * @param aggAliasagg alias for following `select()` clause.
+  * @param overWindowAlias over window alias
+  * @param overWindow  over window
+  */
+case class OverCall(
+agg: Aggregation,
+var aggAlias: Expression,
+overWindowAlias: Expression,
+var overWindow: OverWindow = null) extends Expression {
+
+  private[flink] def as(aggAlias: Expression): OverCall = {
+this.aggAlias = aggAlias
+this
+  }
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+
+val rexBuilder = relBuilder.getRexBuilder
+
+val operator: SqlAggFunction = agg.toSqlAggFunction()
+
+val aggReturnType: TypeInformation[_] = agg.resultType
+
+val relDataType = 
SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType)
+
+val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
+val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name
+
+aggExprs.add(relBuilder.field(aggChildName))
+
+val orderKeys: ImmutableList.Builder[RexFieldCollation] =
+  new ImmutableList.Builder[RexFieldCollation]()
+
+val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]()
+val orderName = 
overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
+
+val rexNode =
+  if (orderName.equalsIgnoreCase("rowtime") || 
orderName.equalsIgnoreCase("proctime")) {
+// for stream
+relBuilder.literal(orderName)
+  } else {
+// for batch
+relBuilder.field(orderName)
+  }
+
+orderKeys.add(new RexFieldCollation(rexNode,sets))
+
+val partitionKeys: util.ArrayList[RexNode] = new 
util.ArrayList[RexNode]()
+overWindow.partitionBy.foreach(x=>
--- End diff --

+spacea `foreach( x =>`


> Integrating the OVER windows in the Table API
> -
>
> Key: FLINK-6228
> URL: https://issues.apache.org/jira/browse/FLINK-6228
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Syntax:
> {code}
> table
>.overWindows(
> (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy 
> order_by_expression] 
>   (preceding  
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW)
>  [following 
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW]
> as alias,...[n])
>)
>   .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n])
> {code}
> Implement restrictions:
> * All OVER clauses in the same SELECT clause must be exactly the same.
> * The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> * The ORDER BY Before the 
> [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884]  implementation 
> orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for 
> batch).
> * FOLLOWING is not supported.
> User interface design document [See | 
> https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977655#comment-15977655
 ] 

ASF GitHub Bot commented on FLINK-6228:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112471307
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
 ---
@@ -216,6 +216,26 @@ object ProjectionTranslator {
 projectList += unresolved
 }
 
+  case OverCall(agg, aggAlias, alias, _) =>
+val overWindow = overWindows.find(_.alias.equals(alias))
+val aggName = agg.child.asInstanceOf[UnresolvedFieldReference].name
+val childrenOutput = parent.output
+val candidates = 
childrenOutput.filter(_.name.equalsIgnoreCase(aggName))
+
+val resolvedFieldReference = if (candidates.length > 1) {
--- End diff --

The aggregations are automatically resolved and validated if we add `agg` 
to the children of `OverCall`


> Integrating the OVER windows in the Table API
> -
>
> Key: FLINK-6228
> URL: https://issues.apache.org/jira/browse/FLINK-6228
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Syntax:
> {code}
> table
>.overWindows(
> (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy 
> order_by_expression] 
>   (preceding  
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW)
>  [following 
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW]
> as alias,...[n])
>)
>   .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n])
> {code}
> Implement restrictions:
> * All OVER clauses in the same SELECT clause must be exactly the same.
> * The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> * The ORDER BY Before the 
> [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884]  implementation 
> orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for 
> batch).
> * FOLLOWING is not supported.
> User interface design document [See | 
> https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977661#comment-15977661
 ] 

ASF GitHub Bot commented on FLINK-6228:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112473915
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ---
@@ -810,6 +810,47 @@ class Table(
 new WindowedTable(this, window)
   }
 
+  /**
+* Groups the records of a table by assigning them to windows defined 
by a time or row interval.
+*
+* For streaming tables of infinite size, grouping into windows is 
required to define finite
+* groups on which over-based aggregates can be computed.
+*
+* Over window for batch tables are currently not supported.
+*
+* @param overWindows windows that specifies how elements are grouped.
+* @return Over windowed table
+*/
+  def window(overWindows: OverWindow*): OverWindowedTable = {
+
+if (tableEnv.isInstanceOf[BatchTableEnvironment]) {
+  throw TableException("Over window for batch tables are currently not 
supported.")
+} else {
+  overWindows.foreach { overWindow =>
+val orderName = 
overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
--- End diff --

This check should be done in `OverCall.validateInput()`


> Integrating the OVER windows in the Table API
> -
>
> Key: FLINK-6228
> URL: https://issues.apache.org/jira/browse/FLINK-6228
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Syntax:
> {code}
> table
>.overWindows(
> (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy 
> order_by_expression] 
>   (preceding  
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW)
>  [following 
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW]
> as alias,...[n])
>)
>   .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n])
> {code}
> Implement restrictions:
> * All OVER clauses in the same SELECT clause must be exactly the same.
> * The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> * The ORDER BY Before the 
> [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884]  implementation 
> orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for 
> batch).
> * FOLLOWING is not supported.
> User interface design document [See | 
> https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977674#comment-15977674
 ] 

ASF GitHub Bot commented on FLINK-6228:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112495783
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
 ---
@@ -54,6 +54,9 @@ case class Project(projectList: Seq[NamedExpression], 
child: LogicalNode) extend
 case expr if !expr.valid => u
 case c @ Cast(ne: NamedExpression, tp) => Alias(c, 
s"${ne.name}-$tp")
 case gcf: GetCompositeField => Alias(gcf, 
gcf.aliasName().getOrElse(s"_c$i"))
+case over: OverCall if null != over.aggAlias =>
--- End diff --

We don't need this if we handle the aggregation alias as regular `Alias` 
expression


> Integrating the OVER windows in the Table API
> -
>
> Key: FLINK-6228
> URL: https://issues.apache.org/jira/browse/FLINK-6228
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Syntax:
> {code}
> table
>.overWindows(
> (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy 
> order_by_expression] 
>   (preceding  
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW)
>  [following 
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW]
> as alias,...[n])
>)
>   .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n])
> {code}
> Implement restrictions:
> * All OVER clauses in the same SELECT clause must be exactly the same.
> * The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> * The ORDER BY Before the 
> [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884]  implementation 
> orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for 
> batch).
> * FOLLOWING is not supported.
> User interface design document [See | 
> https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977658#comment-15977658
 ] 

ASF GitHub Bot commented on FLINK-6228:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112485134
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
 ---
@@ -49,6 +57,128 @@ case class Call(functionName: String, args: 
Seq[Expression]) extends Expression
 }
 
 /**
+  * Over expression for calcite over transform.
+  *
+  * @param agg over-agg expression
+  * @param aggAliasagg alias for following `select()` clause.
+  * @param overWindowAlias over window alias
+  * @param overWindow  over window
+  */
+case class OverCall(
--- End diff --

This class should override `checkInputs()` and check that the interval 
configuration is valid (unbounded - current row, preceding - current row), and 
the checks which are moved from `table.scala`.


> Integrating the OVER windows in the Table API
> -
>
> Key: FLINK-6228
> URL: https://issues.apache.org/jira/browse/FLINK-6228
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Syntax:
> {code}
> table
>.overWindows(
> (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy 
> order_by_expression] 
>   (preceding  
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW)
>  [following 
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW]
> as alias,...[n])
>)
>   .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n])
> {code}
> Implement restrictions:
> * All OVER clauses in the same SELECT clause must be exactly the same.
> * The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> * The ORDER BY Before the 
> [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884]  implementation 
> orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for 
> batch).
> * FOLLOWING is not supported.
> User interface design document [See | 
> https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977679#comment-15977679
 ] 

ASF GitHub Bot commented on FLINK-6228:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112570230
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -93,28 +96,43 @@ class DataStreamOverAggregate(
 
 val orderKeys = overWindow.orderKeys.getFieldCollations
 
-if (orderKeys.size() != 1) {
-  throw new TableException(
-"Unsupported use of OVER windows. The window can only be ordered 
by a single time column.")
-}
-val orderKey = orderKeys.get(0)
+val timeType = if (!orderKeys.isEmpty) {
+  if (orderKeys.size() != 1) {
+throw new TableException(
+  "Unsupported use of OVER windows. The window can only be ordered 
by a single time " +
+"column.")
+  }
+  val orderKey = orderKeys.get(0)
 
-if (!orderKey.direction.equals(ASCENDING)) {
-  throw new TableException(
-"Unsupported use of OVER windows. The window can only be ordered 
in ASCENDING mode.")
+  if (!orderKey.direction.equals(ASCENDING)) {
+throw new TableException(
+  "Unsupported use of OVER windows. The window can only be ordered 
in ASCENDING mode.")
+  }
+  inputType
+.getFieldList
+.get(orderKey.getFieldIndex)
+.getValue.asInstanceOf[TimeModeType]
+} else {
--- End diff --

this could be removed if we don't use literals for the orderBy() field


> Integrating the OVER windows in the Table API
> -
>
> Key: FLINK-6228
> URL: https://issues.apache.org/jira/browse/FLINK-6228
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Syntax:
> {code}
> table
>.overWindows(
> (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy 
> order_by_expression] 
>   (preceding  
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW)
>  [following 
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW]
> as alias,...[n])
>)
>   .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n])
> {code}
> Implement restrictions:
> * All OVER clauses in the same SELECT clause must be exactly the same.
> * The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> * The ORDER BY Before the 
> [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884]  implementation 
> orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for 
> batch).
> * FOLLOWING is not supported.
> User interface design document [See | 
> https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977680#comment-15977680
 ] 

ASF GitHub Bot commented on FLINK-6228:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112557404
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
 ---
@@ -49,6 +57,128 @@ case class Call(functionName: String, args: 
Seq[Expression]) extends Expression
 }
 
 /**
+  * Over expression for calcite over transform.
+  *
+  * @param agg over-agg expression
+  * @param aggAliasagg alias for following `select()` clause.
+  * @param overWindowAlias over window alias
+  * @param overWindow  over window
+  */
+case class OverCall(
+agg: Aggregation,
+var aggAlias: Expression,
+overWindowAlias: Expression,
+var overWindow: OverWindow = null) extends Expression {
+
+  private[flink] def as(aggAlias: Expression): OverCall = {
+this.aggAlias = aggAlias
+this
+  }
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+
+val rexBuilder = relBuilder.getRexBuilder
+
+val operator: SqlAggFunction = agg.toSqlAggFunction()
+
+val aggReturnType: TypeInformation[_] = agg.resultType
+
+val relDataType = 
SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType)
+
+val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
+val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name
+
+aggExprs.add(relBuilder.field(aggChildName))
+
+val orderKeys: ImmutableList.Builder[RexFieldCollation] =
+  new ImmutableList.Builder[RexFieldCollation]()
+
+val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]()
--- End diff --

+space `val sets: util.HashSet`


> Integrating the OVER windows in the Table API
> -
>
> Key: FLINK-6228
> URL: https://issues.apache.org/jira/browse/FLINK-6228
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Syntax:
> {code}
> table
>.overWindows(
> (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy 
> order_by_expression] 
>   (preceding  
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW)
>  [following 
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW]
> as alias,...[n])
>)
>   .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n])
> {code}
> Implement restrictions:
> * All OVER clauses in the same SELECT clause must be exactly the same.
> * The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> * The ORDER BY Before the 
> [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884]  implementation 
> orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for 
> batch).
> * FOLLOWING is not supported.
> User interface design document [See | 
> https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977662#comment-15977662
 ] 

ASF GitHub Bot commented on FLINK-6228:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112461145
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
 ---
@@ -216,6 +216,26 @@ object ProjectionTranslator {
 projectList += unresolved
 }
 
+  case OverCall(agg, aggAlias, alias, _) =>
+val overWindow = overWindows.find(_.alias.equals(alias))
+val aggName = agg.child.asInstanceOf[UnresolvedFieldReference].name
+val childrenOutput = parent.output
+val candidates = 
childrenOutput.filter(_.name.equalsIgnoreCase(aggName))
+
+val resolvedFieldReference = if (candidates.length > 1) {
+  throw new TableException(s"Reference $aggName is ambiguous.")
+} else if (candidates.isEmpty) {
+  throw new TableException(s"Can not resolve [$aggName].")
+} else {
+  Some(candidates.head.withName(aggName))
+}
+
+projectList += new OverCall(
--- End diff --

remove `new`


> Integrating the OVER windows in the Table API
> -
>
> Key: FLINK-6228
> URL: https://issues.apache.org/jira/browse/FLINK-6228
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Syntax:
> {code}
> table
>.overWindows(
> (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy 
> order_by_expression] 
>   (preceding  
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW)
>  [following 
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW]
> as alias,...[n])
>)
>   .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n])
> {code}
> Implement restrictions:
> * All OVER clauses in the same SELECT clause must be exactly the same.
> * The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> * The ORDER BY Before the 
> [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884]  implementation 
> orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for 
> batch).
> * FOLLOWING is not supported.
> User interface design document [See | 
> https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977657#comment-15977657
 ] 

ASF GitHub Bot commented on FLINK-6228:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112469838
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
 ---
@@ -35,6 +37,18 @@ abstract sealed class Aggregation extends 
UnaryExpression {
 * Convert Aggregate to its counterpart in Calcite, i.e. AggCall
 */
   private[flink] def toAggCall(name: String)(implicit relBuilder: 
RelBuilder): AggCall
+
+  /**
+* Because SqlAggFunction from Calcite's AggCallImpl is invisible,
+* we have to manually create sqlAggFunction in flink code base.
+*
+*/
+  private[flink] def toSqlAggFunction()(implicit relBuilder: RelBuilder): 
SqlAggFunction
+
+  /**
+* Attach the Resolved Child to aggregation
+*/
+  private[flink] def withResolvedChild(child: Expression): Aggregation
--- End diff --

remove the `withResolvedChild()` method as we validate the aggregation 
argument with the default validation.


> Integrating the OVER windows in the Table API
> -
>
> Key: FLINK-6228
> URL: https://issues.apache.org/jira/browse/FLINK-6228
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Syntax:
> {code}
> table
>.overWindows(
> (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy 
> order_by_expression] 
>   (preceding  
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW)
>  [following 
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW]
> as alias,...[n])
>)
>   .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n])
> {code}
> Implement restrictions:
> * All OVER clauses in the same SELECT clause must be exactly the same.
> * The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> * The ORDER BY Before the 
> [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884]  implementation 
> orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for 
> batch).
> * FOLLOWING is not supported.
> User interface design document [See | 
> https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977675#comment-15977675
 ] 

ASF GitHub Bot commented on FLINK-6228:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112567644
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
 ---
@@ -49,6 +57,128 @@ case class Call(functionName: String, args: 
Seq[Expression]) extends Expression
 }
 
 /**
+  * Over expression for calcite over transform.
+  *
+  * @param agg over-agg expression
+  * @param aggAliasagg alias for following `select()` clause.
+  * @param overWindowAlias over window alias
+  * @param overWindow  over window
+  */
+case class OverCall(
+agg: Aggregation,
+var aggAlias: Expression,
+overWindowAlias: Expression,
+var overWindow: OverWindow = null) extends Expression {
+
+  private[flink] def as(aggAlias: Expression): OverCall = {
+this.aggAlias = aggAlias
+this
+  }
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+
+val rexBuilder = relBuilder.getRexBuilder
+
+val operator: SqlAggFunction = agg.toSqlAggFunction()
+
+val aggReturnType: TypeInformation[_] = agg.resultType
+
+val relDataType = 
SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType)
+
+val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
+val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name
+
+aggExprs.add(relBuilder.field(aggChildName))
+
+val orderKeys: ImmutableList.Builder[RexFieldCollation] =
+  new ImmutableList.Builder[RexFieldCollation]()
+
+val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]()
+val orderName = 
overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
+
+val rexNode =
+  if (orderName.equalsIgnoreCase("rowtime") || 
orderName.equalsIgnoreCase("proctime")) {
+// for stream
+relBuilder.literal(orderName)
+  } else {
+// for batch
+relBuilder.field(orderName)
+  }
+
+orderKeys.add(new RexFieldCollation(rexNode,sets))
+
+val partitionKeys: util.ArrayList[RexNode] = new 
util.ArrayList[RexNode]()
+overWindow.partitionBy.foreach(x=>
+  
partitionKeys.add(relBuilder.field(x.asInstanceOf[UnresolvedFieldReference].name)))
+
+val preceding = overWindow.preceding.asInstanceOf[Literal]
+val following = overWindow.following.asInstanceOf[Literal]
+
+val isPhysical: Boolean = 
preceding.resultType.isInstanceOf[RowIntervalTypeInfo]
+
+val lowerBound = createBound(relBuilder, 
preceding.value.asInstanceOf[Long], SqlKind.PRECEDING)
+val upperBound = createBound(relBuilder, 
following.value.asInstanceOf[Long], SqlKind.FOLLOWING)
+
+rexBuilder.makeOver(
+  relDataType,
+  operator,
+  aggExprs,
+  partitionKeys,
+  orderKeys.build,
+  lowerBound,
+  upperBound,
+  isPhysical,
+  true,
+  false)
+  }
+
+  private def createBound(
+relBuilder: RelBuilder,
+precedingValue: Long,
+sqlKind: SqlKind): RexWindowBound = {
+
+if (precedingValue == Long.MaxValue) {
--- End diff --

Please use the constants defined in `expressionDsl.scala` for the checks


> Integrating the OVER windows in the Table API
> -
>
> Key: FLINK-6228
> URL: https://issues.apache.org/jira/browse/FLINK-6228
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Syntax:
> {code}
> table
>.overWindows(
> (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy 
> order_by_expression] 
>   (preceding  
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW)
>  [following 
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW]
> as alias,...[n])
>)
>   .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n])
> {code}
> Implement restrictions:
> * All OVER clauses in the same SELECT clause must be exactly the same.
> * The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> * The ORDER BY Before the 
> [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884]  implementation 
> orderBy may only have ‘rowtime/’proctime(for 

[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977659#comment-15977659
 ] 

ASF GitHub Bot commented on FLINK-6228:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r11245
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ---
@@ -810,6 +810,47 @@ class Table(
 new WindowedTable(this, window)
   }
 
+  /**
--- End diff --

Extend docs to

```
  /**
* Defines over-windows on the records of a table.
*
* An over-window defines for each record an interval of records over 
which aggregation 
* functions can be computed.
*
* Example:
*
* {{{
*   table
* .window(Over partitionBy 'c orderBy 'rowTime preceding 10.seconds 
as 'ow)
* .select('c, 'b.count over 'ow, 'e.sum over 'ow)
* }}}
*
* __Note__: Computing over window aggregates on a streaming table is 
only a parallel operation
* if the window is partititioned. Otherwise, the whole stream will be 
processed by a single
* task, i.e., with parallelism 1.
*
* __Note__: Over-windows for batch tables are currently not supported.
*
* @param overWindows windows that specify the record interval over 
which aggregations are
*computed.
* @return An OverWindowedTable to specify the aggregations.
*/
```


> Integrating the OVER windows in the Table API
> -
>
> Key: FLINK-6228
> URL: https://issues.apache.org/jira/browse/FLINK-6228
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Syntax:
> {code}
> table
>.overWindows(
> (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy 
> order_by_expression] 
>   (preceding  
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW)
>  [following 
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW]
> as alias,...[n])
>)
>   .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n])
> {code}
> Implement restrictions:
> * All OVER clauses in the same SELECT clause must be exactly the same.
> * The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> * The ORDER BY Before the 
> [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884]  implementation 
> orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for 
> batch).
> * FOLLOWING is not supported.
> User interface design document [See | 
> https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977660#comment-15977660
 ] 

ASF GitHub Bot commented on FLINK-6228:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112456740
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ---
@@ -928,6 +969,27 @@ class WindowedTable(
 
 }
 
+class OverWindowedTable(
+private[flink] val table: Table,
+private[flink] val overWindows: OverWindow*) {
--- End diff --

Please use `Array` instead of varargs for internal methods & classes.


> Integrating the OVER windows in the Table API
> -
>
> Key: FLINK-6228
> URL: https://issues.apache.org/jira/browse/FLINK-6228
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Syntax:
> {code}
> table
>.overWindows(
> (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy 
> order_by_expression] 
>   (preceding  
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW)
>  [following 
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW]
> as alias,...[n])
>)
>   .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n])
> {code}
> Implement restrictions:
> * All OVER clauses in the same SELECT clause must be exactly the same.
> * The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> * The ORDER BY Before the 
> [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884]  implementation 
> orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for 
> batch).
> * FOLLOWING is not supported.
> User interface design document [See | 
> https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977665#comment-15977665
 ] 

ASF GitHub Bot commented on FLINK-6228:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112456197
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ---
@@ -810,6 +810,47 @@ class Table(
 new WindowedTable(this, window)
   }
 
+  /**
+* Groups the records of a table by assigning them to windows defined 
by a time or row interval.
+*
+* For streaming tables of infinite size, grouping into windows is 
required to define finite
+* groups on which over-based aggregates can be computed.
+*
+* Over window for batch tables are currently not supported.
+*
+* @param overWindows windows that specifies how elements are grouped.
+* @return Over windowed table
+*/
+  def window(overWindows: OverWindow*): OverWindowedTable = {
+
+if (tableEnv.isInstanceOf[BatchTableEnvironment]) {
+  throw TableException("Over window for batch tables are currently not 
supported.")
+} else {
+  overWindows.foreach { overWindow =>
+val orderName = 
overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
--- End diff --

The other operators are validated later. Can you add this check to This 
check to `OverCall.validateInput()`?

Please add tests to validate that the checks work.


> Integrating the OVER windows in the Table API
> -
>
> Key: FLINK-6228
> URL: https://issues.apache.org/jira/browse/FLINK-6228
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Syntax:
> {code}
> table
>.overWindows(
> (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy 
> order_by_expression] 
>   (preceding  
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW)
>  [following 
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW]
> as alias,...[n])
>)
>   .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n])
> {code}
> Implement restrictions:
> * All OVER clauses in the same SELECT clause must be exactly the same.
> * The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> * The ORDER BY Before the 
> [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884]  implementation 
> orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for 
> batch).
> * FOLLOWING is not supported.
> User interface design document [See | 
> https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977671#comment-15977671
 ] 

ASF GitHub Bot commented on FLINK-6228:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112567381
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
 ---
@@ -49,6 +57,128 @@ case class Call(functionName: String, args: 
Seq[Expression]) extends Expression
 }
 
 /**
+  * Over expression for calcite over transform.
+  *
+  * @param agg over-agg expression
+  * @param aggAliasagg alias for following `select()` clause.
+  * @param overWindowAlias over window alias
+  * @param overWindow  over window
+  */
+case class OverCall(
+agg: Aggregation,
+var aggAlias: Expression,
+overWindowAlias: Expression,
+var overWindow: OverWindow = null) extends Expression {
+
+  private[flink] def as(aggAlias: Expression): OverCall = {
+this.aggAlias = aggAlias
+this
+  }
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+
+val rexBuilder = relBuilder.getRexBuilder
+
+val operator: SqlAggFunction = agg.toSqlAggFunction()
+
+val aggReturnType: TypeInformation[_] = agg.resultType
+
+val relDataType = 
SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType)
+
+val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
+val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name
+
+aggExprs.add(relBuilder.field(aggChildName))
+
+val orderKeys: ImmutableList.Builder[RexFieldCollation] =
+  new ImmutableList.Builder[RexFieldCollation]()
+
+val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]()
+val orderName = 
overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
+
+val rexNode =
+  if (orderName.equalsIgnoreCase("rowtime") || 
orderName.equalsIgnoreCase("proctime")) {
+// for stream
+relBuilder.literal(orderName)
+  } else {
+// for batch
+relBuilder.field(orderName)
+  }
+
+orderKeys.add(new RexFieldCollation(rexNode,sets))
+
+val partitionKeys: util.ArrayList[RexNode] = new 
util.ArrayList[RexNode]()
+overWindow.partitionBy.foreach(x=>
+  
partitionKeys.add(relBuilder.field(x.asInstanceOf[UnresolvedFieldReference].name)))
+
+val preceding = overWindow.preceding.asInstanceOf[Literal]
+val following = overWindow.following.asInstanceOf[Literal]
+
+val isPhysical: Boolean = 
preceding.resultType.isInstanceOf[RowIntervalTypeInfo]
+
+val lowerBound = createBound(relBuilder, 
preceding.value.asInstanceOf[Long], SqlKind.PRECEDING)
+val upperBound = createBound(relBuilder, 
following.value.asInstanceOf[Long], SqlKind.FOLLOWING)
+
+rexBuilder.makeOver(
+  relDataType,
+  operator,
+  aggExprs,
+  partitionKeys,
+  orderKeys.build,
+  lowerBound,
+  upperBound,
+  isPhysical,
+  true,
+  false)
+  }
+
+  private def createBound(
+relBuilder: RelBuilder,
+precedingValue: Long,
--- End diff --

`precedingValue` -> `value`


> Integrating the OVER windows in the Table API
> -
>
> Key: FLINK-6228
> URL: https://issues.apache.org/jira/browse/FLINK-6228
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Syntax:
> {code}
> table
>.overWindows(
> (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy 
> order_by_expression] 
>   (preceding  
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW)
>  [following 
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW]
> as alias,...[n])
>)
>   .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n])
> {code}
> Implement restrictions:
> * All OVER clauses in the same SELECT clause must be exactly the same.
> * The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> * The ORDER BY Before the 
> [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884]  implementation 
> orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for 
> batch).
> * FOLLOWING is not supported.
> User interface design document [See | 
> 

[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977654#comment-15977654
 ] 

ASF GitHub Bot commented on FLINK-6228:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112476982
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 ---
@@ -364,6 +365,21 @@ trait ImplicitExpressionOperations {
   def position(haystack: Expression) = Position(expr, haystack)
 
   /**
+* For windowing function to config over window
+* e.g.:
+* table
+*   .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows 
following CURRENT_ROW as 'w)
+*   .select('c, 'a, 'a.count over 'w, 'a.sum over 'w)
+*/
+  def over(alias: Expression) = {
+expr match {
+  case _: Aggregation => new OverCall(expr.asInstanceOf[Aggregation], 
null, alias)
--- End diff --

rm `new` (case classes should be instantiated with the `apply()` method, 
i.e., without `new`)


> Integrating the OVER windows in the Table API
> -
>
> Key: FLINK-6228
> URL: https://issues.apache.org/jira/browse/FLINK-6228
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Syntax:
> {code}
> table
>.overWindows(
> (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy 
> order_by_expression] 
>   (preceding  
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW)
>  [following 
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW]
> as alias,...[n])
>)
>   .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n])
> {code}
> Implement restrictions:
> * All OVER clauses in the same SELECT clause must be exactly the same.
> * The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> * The ORDER BY Before the 
> [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884]  implementation 
> orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for 
> batch).
> * FOLLOWING is not supported.
> User interface design document [See | 
> https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977677#comment-15977677
 ] 

ASF GitHub Bot commented on FLINK-6228:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112570409
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala
 ---
@@ -23,7 +23,9 @@ import org.apache.calcite.rel.`type`.{RelDataType, 
RelDataTypeFieldImpl}
 import org.apache.calcite.rel.core.AggregateCall
 import org.apache.calcite.rel.core.Window.Group
 import org.apache.calcite.rel.core.Window
-import org.apache.calcite.rex.{RexInputRef}
+import org.apache.calcite.rex.{RexInputRef, RexLiteral}
--- End diff --

All these changes could be undone if we don't use a literal for the 
orderBy() expression


> Integrating the OVER windows in the Table API
> -
>
> Key: FLINK-6228
> URL: https://issues.apache.org/jira/browse/FLINK-6228
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Syntax:
> {code}
> table
>.overWindows(
> (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy 
> order_by_expression] 
>   (preceding  
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW)
>  [following 
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW]
> as alias,...[n])
>)
>   .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n])
> {code}
> Implement restrictions:
> * All OVER clauses in the same SELECT clause must be exactly the same.
> * The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> * The ORDER BY Before the 
> [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884]  implementation 
> orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for 
> batch).
> * FOLLOWING is not supported.
> User interface design document [See | 
> https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

2017-04-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112461145
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
 ---
@@ -216,6 +216,26 @@ object ProjectionTranslator {
 projectList += unresolved
 }
 
+  case OverCall(agg, aggAlias, alias, _) =>
+val overWindow = overWindows.find(_.alias.equals(alias))
+val aggName = agg.child.asInstanceOf[UnresolvedFieldReference].name
+val childrenOutput = parent.output
+val candidates = 
childrenOutput.filter(_.name.equalsIgnoreCase(aggName))
+
+val resolvedFieldReference = if (candidates.length > 1) {
+  throw new TableException(s"Reference $aggName is ambiguous.")
+} else if (candidates.isEmpty) {
+  throw new TableException(s"Can not resolve [$aggName].")
+} else {
+  Some(candidates.head.withName(aggName))
+}
+
+projectList += new OverCall(
--- End diff --

remove `new`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

2017-04-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112560976
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
 ---
@@ -49,6 +57,128 @@ case class Call(functionName: String, args: 
Seq[Expression]) extends Expression
 }
 
 /**
+  * Over expression for calcite over transform.
+  *
+  * @param agg over-agg expression
+  * @param aggAliasagg alias for following `select()` clause.
+  * @param overWindowAlias over window alias
+  * @param overWindow  over window
+  */
+case class OverCall(
+agg: Aggregation,
+var aggAlias: Expression,
+overWindowAlias: Expression,
+var overWindow: OverWindow = null) extends Expression {
+
+  private[flink] def as(aggAlias: Expression): OverCall = {
+this.aggAlias = aggAlias
+this
+  }
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+
+val rexBuilder = relBuilder.getRexBuilder
+
+val operator: SqlAggFunction = agg.toSqlAggFunction()
+
+val aggReturnType: TypeInformation[_] = agg.resultType
+
+val relDataType = 
SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType)
+
+val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
+val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name
+
+aggExprs.add(relBuilder.field(aggChildName))
+
+val orderKeys: ImmutableList.Builder[RexFieldCollation] =
+  new ImmutableList.Builder[RexFieldCollation]()
+
+val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]()
+val orderName = 
overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
+
+val rexNode =
+  if (orderName.equalsIgnoreCase("rowtime") || 
orderName.equalsIgnoreCase("proctime")) {
+// for stream
+relBuilder.literal(orderName)
--- End diff --

If we do this, we could remove the special literal handling in 
`OverAggregate` and `DataStreamOverAggregate`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977684#comment-15977684
 ] 

ASF GitHub Bot commented on FLINK-6228:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112570917
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import 
org.apache.flink.table.api.scala.stream.table.OverWindowITCase.{RowTimeSourceFunction}
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, 
StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+class OverWindowITCase extends StreamingWithStateTestBase {
+
+  @Test
+  def testProcTimeUnBoundedPartitionedRowOver(): Unit = {
+
+val data = List(
+  (1L, 1, "Hello"),
+  (2L, 2, "Hello"),
+  (3L, 3, "Hello"),
+  (4L, 4, "Hello"),
+  (5L, 5, "Hello"),
+  (6L, 6, "Hello"),
+  (7L, 7, "Hello World"),
+  (8L, 8, "Hello World"),
+  (20L, 20, "Hello World"))
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setParallelism(1)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.testResults = mutable.MutableList()
+StreamITCase.clear
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'a, 'b, 'c)
+
+val windowedTable = table
+  .window(
+Over partitionBy 'c orderBy 'procTime preceding UNBOUNDED_ROW 
following CURRENT_ROW as 'w)
+  .select('c, 'b.count over 'w as 'mycount)
+  .select('c, 'mycount)
+
+val results = windowedTable.toDataStream[Row]
+results.addSink(new StreamITCase.StringSink)
+env.execute()
+
+val expected = Seq(
+  "Hello World,1", "Hello World,2", "Hello World,3",
+  "Hello,1", "Hello,2", "Hello,3", "Hello,4", "Hello,5", "Hello,6")
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testRowTimeUnBoundedPartitionedRangeOver(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+env.setStateBackend(getStateBackend)
+StreamITCase.testResults = mutable.MutableList()
+StreamITCase.clear
+env.setParallelism(1)
+
+val data = Seq(
+  Left(1405L, (1, 1L, "Hi")),
+  Left(1400L, (2, 1L, "Hello")),
+  Left(1402L, (1, 1L, "Hello")),
+  Left(1402L, (1, 2L, "Hello")),
+  Left(1402L, (1, 3L, "Hello world")),
+  Left(1403L, (2, 2L, "Hello world")),
+  Left(1403L, (2, 3L, "Hello world")),
+  Right(1420L),
+  Left(1421L, (1, 4L, "Hello world")),
+  Left(1422L, (1, 5L, "Hello world")),
+  Left(1422L, (1, 6L, "Hello world")),
+  Left(1422L, (1, 7L, "Hello world")),
+  Left(1423L, (2, 4L, "Hello world")),
+  Left(1423L, (2, 5L, "Hello world")),
+  Right(1430L)
+ 

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

2017-04-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112469838
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
 ---
@@ -35,6 +37,18 @@ abstract sealed class Aggregation extends 
UnaryExpression {
 * Convert Aggregate to its counterpart in Calcite, i.e. AggCall
 */
   private[flink] def toAggCall(name: String)(implicit relBuilder: 
RelBuilder): AggCall
+
+  /**
+* Because SqlAggFunction from Calcite's AggCallImpl is invisible,
+* we have to manually create sqlAggFunction in flink code base.
+*
+*/
+  private[flink] def toSqlAggFunction()(implicit relBuilder: RelBuilder): 
SqlAggFunction
+
+  /**
+* Attach the Resolved Child to aggregation
+*/
+  private[flink] def withResolvedChild(child: Expression): Aggregation
--- End diff --

remove the `withResolvedChild()` method as we validate the aggregation 
argument with the default validation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

2017-04-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112571486
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala
 ---
@@ -22,6 +22,126 @@ import org.apache.flink.table.expressions.{Expression, 
ExpressionParser}
 import org.apache.flink.table.plan.logical._
 
 /**
+  * An over window specification.
+  *
+  * Over window is similar to the traditional OVER SQL.
+  */
+class OverWindow {
+
+  private[flink] var alias: Expression = _
+  private[flink] var partitionBy: Seq[Expression] = Seq[Expression]()
+  private[flink] var orderBy: Expression = _
+  private[flink] var preceding: Expression = _
+  private[flink] var following: Expression = null
+
+  /**
+* Assigns an alias for this window that the following `select()` 
clause can refer to.
+*
+* @param alias alias for this over window
+* @return this over window
+*/
+  def as(alias: String): OverWindow = 
as(ExpressionParser.parseExpression(alias))
+
+  /**
+* Assigns an alias for this window that the following `select()` 
clause can refer to.
+*
+* @param alias alias for this over window
+* @return this over window
+*/
+  def as(alias: Expression): OverWindow = {
+this.alias = alias
+this
+  }
+
+  /**
+* Partitions the elements on some partition keys.
+*
+* @param partitionBy
+* @return this over window
+*/
+  def partitionBy(partitionBy: String): OverWindow = {
+this.partitionBy(ExpressionParser.parseExpression(partitionBy))
+  }
+
+  /**
+* Partitions the elements on some partition keys.
+*
+* @param partitionBy
+* @return this over window
+*/
+  def partitionBy(partitionBy: Expression*): OverWindow = {
+this.partitionBy = partitionBy
+this
+  }
+
+
+  /**
+* Specifies the time mode.
+*
+* @param orderBy For streaming tables call [[orderBy 'rowtime or 
orderBy 'proctime]]
+*to specify time mode.
+* @return this over window
+*/
+  def orderBy(orderBy: String): OverWindow = {
+this.orderBy(ExpressionParser.parseExpression(orderBy))
+  }
+
+  /**
+* Specifies the time mode.
+*
+* @param orderBy For streaming tables call [[orderBy 'rowtime or 
orderBy 'proctime]]
+*to specify time mode.
+* @return this over window
+*/
+  def orderBy(orderBy: Expression): OverWindow = {
+this.orderBy = orderBy
+this
+  }
+
+  /**
+* Set the preceding offset (based on time or row-count intervals) for 
over window
+*
+* @param preceding forward offset that relative to the current row
+* @return this over window
+*/
+  def preceding(preceding: String): OverWindow = {
+this.preceding(ExpressionParser.parseExpression(preceding))
+  }
+
+  /**
+* Set the preceding offset (based on time or row-count intervals) for 
over window
+*
+* @param preceding forward offset that relative to the current row
+* @return this over window
+*/
+  def preceding(preceding: Expression): OverWindow = {
+this.preceding = preceding
+this
+  }
+
+  /**
+* Set the following offset (based on time or row-count intervals) for 
over window
+*
+* @param following subsequent offset that relative to the current row
+* @return this over window
+*/
+  def following(following: String): OverWindow = {
--- End diff --

Should we make `following` optional and default to CURRENT_ROW / 
CURRENT_RANGE depending on the type of `preceding`? I think that would be a 
nice shortcut and also be aligned with SQL.

What do you think @sunjincheng121 ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

2017-04-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112494372
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
 ---
@@ -49,6 +57,128 @@ case class Call(functionName: String, args: 
Seq[Expression]) extends Expression
 }
 
 /**
+  * Over expression for calcite over transform.
+  *
+  * @param agg over-agg expression
+  * @param aggAliasagg alias for following `select()` clause.
+  * @param overWindowAlias over window alias
+  * @param overWindow  over window
+  */
+case class OverCall(
+agg: Aggregation,
+var aggAlias: Expression,
--- End diff --

The `aggAlias` should be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977676#comment-15977676
 ] 

ASF GitHub Bot commented on FLINK-6228:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112499806
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -93,28 +96,43 @@ class DataStreamOverAggregate(
 
 val orderKeys = overWindow.orderKeys.getFieldCollations
 
-if (orderKeys.size() != 1) {
-  throw new TableException(
-"Unsupported use of OVER windows. The window can only be ordered 
by a single time column.")
-}
-val orderKey = orderKeys.get(0)
+val timeType = if (!orderKeys.isEmpty) {
+  if (orderKeys.size() != 1) {
+throw new TableException(
+  "Unsupported use of OVER windows. The window can only be ordered 
by a single time " +
+"column.")
+  }
+  val orderKey = orderKeys.get(0)
 
-if (!orderKey.direction.equals(ASCENDING)) {
-  throw new TableException(
-"Unsupported use of OVER windows. The window can only be ordered 
in ASCENDING mode.")
+  if (!orderKey.direction.equals(ASCENDING)) {
+throw new TableException(
+  "Unsupported use of OVER windows. The window can only be ordered 
in ASCENDING mode.")
+  }
+  inputType
+.getFieldList
+.get(orderKey.getFieldIndex)
+.getValue.asInstanceOf[TimeModeType]
+} else {
+  val it = logicWindow.constants.listIterator()
+  if (it.hasNext) {
+val item = it.next().getValue
+if (item.isInstanceOf[NlsString]) {
+  val value = item.asInstanceOf[NlsString].getValue
+  if (value.equalsIgnoreCase("rowtime")) {
+new RowTimeType
+  } else {
+new ProcTimeType
+  }
+}
+  }
 }
 
 val inputDS = 
input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
 
 val generator = new CodeGenerator(
-  tableEnv.getConfig,
-  false,
-  inputDS.getType)
-
-val timeType = inputType
-  .getFieldList
-  .get(orderKey.getFieldIndex)
-  .getValue
+tableEnv.getConfig,
--- End diff --

indent


> Integrating the OVER windows in the Table API
> -
>
> Key: FLINK-6228
> URL: https://issues.apache.org/jira/browse/FLINK-6228
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Syntax:
> {code}
> table
>.overWindows(
> (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy 
> order_by_expression] 
>   (preceding  
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW)
>  [following 
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW]
> as alias,...[n])
>)
>   .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n])
> {code}
> Implement restrictions:
> * All OVER clauses in the same SELECT clause must be exactly the same.
> * The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> * The ORDER BY Before the 
> [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884]  implementation 
> orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for 
> batch).
> * FOLLOWING is not supported.
> User interface design document [See | 
> https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977683#comment-15977683
 ] 

ASF GitHub Bot commented on FLINK-6228:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112560976
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
 ---
@@ -49,6 +57,128 @@ case class Call(functionName: String, args: 
Seq[Expression]) extends Expression
 }
 
 /**
+  * Over expression for calcite over transform.
+  *
+  * @param agg over-agg expression
+  * @param aggAliasagg alias for following `select()` clause.
+  * @param overWindowAlias over window alias
+  * @param overWindow  over window
+  */
+case class OverCall(
+agg: Aggregation,
+var aggAlias: Expression,
+overWindowAlias: Expression,
+var overWindow: OverWindow = null) extends Expression {
+
+  private[flink] def as(aggAlias: Expression): OverCall = {
+this.aggAlias = aggAlias
+this
+  }
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+
+val rexBuilder = relBuilder.getRexBuilder
+
+val operator: SqlAggFunction = agg.toSqlAggFunction()
+
+val aggReturnType: TypeInformation[_] = agg.resultType
+
+val relDataType = 
SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType)
+
+val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
+val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name
+
+aggExprs.add(relBuilder.field(aggChildName))
+
+val orderKeys: ImmutableList.Builder[RexFieldCollation] =
+  new ImmutableList.Builder[RexFieldCollation]()
+
+val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]()
+val orderName = 
overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
+
+val rexNode =
+  if (orderName.equalsIgnoreCase("rowtime") || 
orderName.equalsIgnoreCase("proctime")) {
+// for stream
+relBuilder.literal(orderName)
--- End diff --

If we do this, we could remove the special literal handling in 
`OverAggregate` and `DataStreamOverAggregate`.


> Integrating the OVER windows in the Table API
> -
>
> Key: FLINK-6228
> URL: https://issues.apache.org/jira/browse/FLINK-6228
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Syntax:
> {code}
> table
>.overWindows(
> (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy 
> order_by_expression] 
>   (preceding  
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW)
>  [following 
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW]
> as alias,...[n])
>)
>   .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n])
> {code}
> Implement restrictions:
> * All OVER clauses in the same SELECT clause must be exactly the same.
> * The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> * The ORDER BY Before the 
> [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884]  implementation 
> orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for 
> batch).
> * FOLLOWING is not supported.
> User interface design document [See | 
> https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977668#comment-15977668
 ] 

ASF GitHub Bot commented on FLINK-6228:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112495160
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
 ---
@@ -49,6 +57,128 @@ case class Call(functionName: String, args: 
Seq[Expression]) extends Expression
 }
 
 /**
+  * Over expression for calcite over transform.
+  *
+  * @param agg over-agg expression
+  * @param aggAliasagg alias for following `select()` clause.
+  * @param overWindowAlias over window alias
+  * @param overWindow  over window
+  */
+case class OverCall(
+agg: Aggregation,
+var aggAlias: Expression,
+overWindowAlias: Expression,
+var overWindow: OverWindow = null) extends Expression {
+
+  private[flink] def as(aggAlias: Expression): OverCall = {
--- End diff --

The proper translation can be done in 
`ProjectTranslator.translateOverWindow()`


> Integrating the OVER windows in the Table API
> -
>
> Key: FLINK-6228
> URL: https://issues.apache.org/jira/browse/FLINK-6228
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Syntax:
> {code}
> table
>.overWindows(
> (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy 
> order_by_expression] 
>   (preceding  
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW)
>  [following 
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW]
> as alias,...[n])
>)
>   .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n])
> {code}
> Implement restrictions:
> * All OVER clauses in the same SELECT clause must be exactly the same.
> * The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> * The ORDER BY Before the 
> [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884]  implementation 
> orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for 
> batch).
> * FOLLOWING is not supported.
> User interface design document [See | 
> https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977652#comment-15977652
 ] 

ASF GitHub Bot commented on FLINK-6228:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112460318
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
 ---
@@ -216,6 +216,26 @@ object ProjectionTranslator {
 projectList += unresolved
 }
 
+  case OverCall(agg, aggAlias, alias, _) =>
+val overWindow = overWindows.find(_.alias.equals(alias))
+val aggName = agg.child.asInstanceOf[UnresolvedFieldReference].name
+val childrenOutput = parent.output
--- End diff --

`childOutput` or `parentOutput`?


> Integrating the OVER windows in the Table API
> -
>
> Key: FLINK-6228
> URL: https://issues.apache.org/jira/browse/FLINK-6228
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Syntax:
> {code}
> table
>.overWindows(
> (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy 
> order_by_expression] 
>   (preceding  
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW)
>  [following 
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW]
> as alias,...[n])
>)
>   .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n])
> {code}
> Implement restrictions:
> * All OVER clauses in the same SELECT clause must be exactly the same.
> * The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> * The ORDER BY Before the 
> [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884]  implementation 
> orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for 
> batch).
> * FOLLOWING is not supported.
> User interface design document [See | 
> https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977673#comment-15977673
 ] 

ASF GitHub Bot commented on FLINK-6228:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112564205
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
 ---
@@ -49,6 +57,128 @@ case class Call(functionName: String, args: 
Seq[Expression]) extends Expression
 }
 
 /**
+  * Over expression for calcite over transform.
+  *
+  * @param agg over-agg expression
+  * @param aggAliasagg alias for following `select()` clause.
+  * @param overWindowAlias over window alias
+  * @param overWindow  over window
+  */
+case class OverCall(
--- End diff --

We also need to validate other properties of the overWindow:
- check that the partitionBy expressions are valid fields in the input. 
- If a partitionBy expression is not a field, we would need to push the 
expression into a Project before the Project with the overWindow and reference 
the new field. This would need to happen in `Table.window()`. I think for now 
we can have the restriction that only field references are allowed expressions.
- validate the `preceding` and `following` are literals @


> Integrating the OVER windows in the Table API
> -
>
> Key: FLINK-6228
> URL: https://issues.apache.org/jira/browse/FLINK-6228
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Syntax:
> {code}
> table
>.overWindows(
> (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy 
> order_by_expression] 
>   (preceding  
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW)
>  [following 
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW]
> as alias,...[n])
>)
>   .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n])
> {code}
> Implement restrictions:
> * All OVER clauses in the same SELECT clause must be exactly the same.
> * The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> * The ORDER BY Before the 
> [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884]  implementation 
> orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for 
> batch).
> * FOLLOWING is not supported.
> User interface design document [See | 
> https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

2017-04-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112459590
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
 ---
@@ -190,8 +190,8 @@ object ProjectionTranslator {
   def expandProjectList(
--- End diff --

I would not change the `expandProjectList()` method which has a dedicated 
purpose.

Instead, I would add a new method to `ProjectionTranslator`, which just 
translates the over windows:

```
def translateOverWindows(
  exprs: Seq[Expression],
  overWindows: Array[OverWindow]): Seq[Expression]
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977651#comment-15977651
 ] 

ASF GitHub Bot commented on FLINK-6228:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112457606
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
 ---
@@ -190,8 +190,8 @@ object ProjectionTranslator {
   def expandProjectList(
   exprs: Seq[Expression],
   parent: LogicalNode,
-  tableEnv: TableEnvironment)
-: Seq[Expression] = {
+  tableEnv: TableEnvironment,
+  overWindows: OverWindow*): Seq[Expression] = {
--- End diff --

Please use `Array` instead of varargs.


> Integrating the OVER windows in the Table API
> -
>
> Key: FLINK-6228
> URL: https://issues.apache.org/jira/browse/FLINK-6228
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Syntax:
> {code}
> table
>.overWindows(
> (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy 
> order_by_expression] 
>   (preceding  
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW)
>  [following 
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW]
> as alias,...[n])
>)
>   .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n])
> {code}
> Implement restrictions:
> * All OVER clauses in the same SELECT clause must be exactly the same.
> * The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> * The ORDER BY Before the 
> [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884]  implementation 
> orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for 
> batch).
> * FOLLOWING is not supported.
> User interface design document [See | 
> https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

2017-04-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112482490
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala
 ---
@@ -22,6 +22,126 @@ import org.apache.flink.table.expressions.{Expression, 
ExpressionParser}
 import org.apache.flink.table.plan.logical._
 
 /**
+  * An over window specification.
+  *
+  * Over window is similar to the traditional OVER SQL.
+  */
+class OverWindow {
+
+  private[flink] var alias: Expression = _
+  private[flink] var partitionBy: Seq[Expression] = Seq[Expression]()
+  private[flink] var orderBy: Expression = _
+  private[flink] var preceding: Expression = _
+  private[flink] var following: Expression = null
+
+  /**
+* Assigns an alias for this window that the following `select()` 
clause can refer to.
+*
+* @param alias alias for this over window
+* @return this over window
+*/
+  def as(alias: String): OverWindow = 
as(ExpressionParser.parseExpression(alias))
+
+  /**
+* Assigns an alias for this window that the following `select()` 
clause can refer to.
+*
+* @param alias alias for this over window
+* @return this over window
+*/
+  def as(alias: Expression): OverWindow = {
--- End diff --

I think it would be good if we could enforce the alias. 
This could be done by first build a window which is not of type 
`OverWindow` and letting `as()` return the complete `OverWindow`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977649#comment-15977649
 ] 

ASF GitHub Bot commented on FLINK-6228:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112459590
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
 ---
@@ -190,8 +190,8 @@ object ProjectionTranslator {
   def expandProjectList(
--- End diff --

I would not change the `expandProjectList()` method which has a dedicated 
purpose.

Instead, I would add a new method to `ProjectionTranslator`, which just 
translates the over windows:

```
def translateOverWindows(
  exprs: Seq[Expression],
  overWindows: Array[OverWindow]): Seq[Expression]
```


> Integrating the OVER windows in the Table API
> -
>
> Key: FLINK-6228
> URL: https://issues.apache.org/jira/browse/FLINK-6228
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Syntax:
> {code}
> table
>.overWindows(
> (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy 
> order_by_expression] 
>   (preceding  
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW)
>  [following 
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW]
> as alias,...[n])
>)
>   .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n])
> {code}
> Implement restrictions:
> * All OVER clauses in the same SELECT clause must be exactly the same.
> * The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> * The ORDER BY Before the 
> [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884]  implementation 
> orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for 
> batch).
> * FOLLOWING is not supported.
> User interface design document [See | 
> https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

2017-04-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112460885
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
 ---
@@ -216,6 +216,26 @@ object ProjectionTranslator {
 projectList += unresolved
 }
 
+  case OverCall(agg, aggAlias, alias, _) =>
+val overWindow = overWindows.find(_.alias.equals(alias))
+val aggName = agg.child.asInstanceOf[UnresolvedFieldReference].name
+val childrenOutput = parent.output
+val candidates = 
childrenOutput.filter(_.name.equalsIgnoreCase(aggName))
+
+val resolvedFieldReference = if (candidates.length > 1) {
--- End diff --

the other expressions are resolved later. We should follow this pattern to 
the the code base consistent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

2017-04-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112485134
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
 ---
@@ -49,6 +57,128 @@ case class Call(functionName: String, args: 
Seq[Expression]) extends Expression
 }
 
 /**
+  * Over expression for calcite over transform.
+  *
+  * @param agg over-agg expression
+  * @param aggAliasagg alias for following `select()` clause.
+  * @param overWindowAlias over window alias
+  * @param overWindow  over window
+  */
+case class OverCall(
--- End diff --

This class should override `checkInputs()` and check that the interval 
configuration is valid (unbounded - current row, preceding - current row), and 
the checks which are moved from `table.scala`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

2017-04-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112559640
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
 ---
@@ -49,6 +57,128 @@ case class Call(functionName: String, args: 
Seq[Expression]) extends Expression
 }
 
 /**
+  * Over expression for calcite over transform.
+  *
+  * @param agg over-agg expression
+  * @param aggAliasagg alias for following `select()` clause.
+  * @param overWindowAlias over window alias
+  * @param overWindow  over window
+  */
+case class OverCall(
+agg: Aggregation,
+var aggAlias: Expression,
+overWindowAlias: Expression,
+var overWindow: OverWindow = null) extends Expression {
+
+  private[flink] def as(aggAlias: Expression): OverCall = {
+this.aggAlias = aggAlias
+this
+  }
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+
+val rexBuilder = relBuilder.getRexBuilder
+
+val operator: SqlAggFunction = agg.toSqlAggFunction()
+
+val aggReturnType: TypeInformation[_] = agg.resultType
+
+val relDataType = 
SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType)
+
+val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
+val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name
+
+aggExprs.add(relBuilder.field(aggChildName))
+
+val orderKeys: ImmutableList.Builder[RexFieldCollation] =
+  new ImmutableList.Builder[RexFieldCollation]()
+
+val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]()
+val orderName = 
overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
+
+val rexNode =
+  if (orderName.equalsIgnoreCase("rowtime") || 
orderName.equalsIgnoreCase("proctime")) {
+// for stream
+relBuilder.literal(orderName)
--- End diff --

can we use `relBuilder.call(EventTimeExtractor)` and 
`relBuilder.call(ProcTimeExtractor)` here to make the logic identical to SQL?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

2017-04-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112479103
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 ---
@@ -586,6 +602,13 @@ trait ImplicitExpressionOperations {
  * to [[ImplicitExpressionOperations]].
  */
 trait ImplicitExpressionConversions {
+
+  implicit val UNBOUNDED_ROW = toRowInterval(Long.MaxValue)
+  implicit val UNBOUNDED_RANGE = toMilliInterval(1, Long.MaxValue)
+
+  implicit val CURRENT_ROW = toRowInterval(0L)
+  implicit val CURRENT_RANGE = toMilliInterval(0L, Long.MaxValue)
--- End diff --

isn't a range interval of `0` a valid range to use? I would mean all rows 
that arrived in the same millisecond as the current row, no?
Should we use `-1` as constant for current row instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977672#comment-15977672
 ] 

ASF GitHub Bot commented on FLINK-6228:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112559640
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
 ---
@@ -49,6 +57,128 @@ case class Call(functionName: String, args: 
Seq[Expression]) extends Expression
 }
 
 /**
+  * Over expression for calcite over transform.
+  *
+  * @param agg over-agg expression
+  * @param aggAliasagg alias for following `select()` clause.
+  * @param overWindowAlias over window alias
+  * @param overWindow  over window
+  */
+case class OverCall(
+agg: Aggregation,
+var aggAlias: Expression,
+overWindowAlias: Expression,
+var overWindow: OverWindow = null) extends Expression {
+
+  private[flink] def as(aggAlias: Expression): OverCall = {
+this.aggAlias = aggAlias
+this
+  }
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+
+val rexBuilder = relBuilder.getRexBuilder
+
+val operator: SqlAggFunction = agg.toSqlAggFunction()
+
+val aggReturnType: TypeInformation[_] = agg.resultType
+
+val relDataType = 
SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType)
+
+val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
+val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name
+
+aggExprs.add(relBuilder.field(aggChildName))
+
+val orderKeys: ImmutableList.Builder[RexFieldCollation] =
+  new ImmutableList.Builder[RexFieldCollation]()
+
+val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]()
+val orderName = 
overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
+
+val rexNode =
+  if (orderName.equalsIgnoreCase("rowtime") || 
orderName.equalsIgnoreCase("proctime")) {
+// for stream
+relBuilder.literal(orderName)
--- End diff --

can we use `relBuilder.call(EventTimeExtractor)` and 
`relBuilder.call(ProcTimeExtractor)` here to make the logic identical to SQL?


> Integrating the OVER windows in the Table API
> -
>
> Key: FLINK-6228
> URL: https://issues.apache.org/jira/browse/FLINK-6228
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Syntax:
> {code}
> table
>.overWindows(
> (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy 
> order_by_expression] 
>   (preceding  
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW)
>  [following 
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW]
> as alias,...[n])
>)
>   .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n])
> {code}
> Implement restrictions:
> * All OVER clauses in the same SELECT clause must be exactly the same.
> * The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> * The ORDER BY Before the 
> [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884]  implementation 
> orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for 
> batch).
> * FOLLOWING is not supported.
> User interface design document [See | 
> https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

2017-04-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112456485
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ---
@@ -810,6 +810,47 @@ class Table(
 new WindowedTable(this, window)
   }
 
+  /**
+* Groups the records of a table by assigning them to windows defined 
by a time or row interval.
+*
+* For streaming tables of infinite size, grouping into windows is 
required to define finite
+* groups on which over-based aggregates can be computed.
+*
+* Over window for batch tables are currently not supported.
+*
+* @param overWindows windows that specifies how elements are grouped.
+* @return Over windowed table
+*/
+  def window(overWindows: OverWindow*): OverWindowedTable = {
+
+if (tableEnv.isInstanceOf[BatchTableEnvironment]) {
+  throw TableException("Over window for batch tables are currently not 
supported.")
+} else {
+  overWindows.foreach { overWindow =>
+val orderName = 
overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
+if (!orderName.equalsIgnoreCase("rowtime")
+  && !orderName.equalsIgnoreCase("proctime")) {
+  throw ValidationException(
+s"OrderBy expression must be ['rowtime] or ['proctime], but 
got ['${orderName}]")
+}
+  }
+}
+
+if (overWindows.size != 1) {
+  throw TableException("OverWindow only supported single window at 
current time.")
+}
+
+overWindows.foreach { overWindow =>
+  if (!overWindow.preceding.asInstanceOf[Literal].resultType.getClass
--- End diff --

This check should be done in `OverCall.validateInput()` as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

2017-04-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112476982
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 ---
@@ -364,6 +365,21 @@ trait ImplicitExpressionOperations {
   def position(haystack: Expression) = Position(expr, haystack)
 
   /**
+* For windowing function to config over window
+* e.g.:
+* table
+*   .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows 
following CURRENT_ROW as 'w)
+*   .select('c, 'a, 'a.count over 'w, 'a.sum over 'w)
+*/
+  def over(alias: Expression) = {
+expr match {
+  case _: Aggregation => new OverCall(expr.asInstanceOf[Aggregation], 
null, alias)
--- End diff --

rm `new` (case classes should be instantiated with the `apply()` method, 
i.e., without `new`)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977664#comment-15977664
 ] 

ASF GitHub Bot commented on FLINK-6228:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112555311
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala
 ---
@@ -83,3 +83,49 @@ object Session {
 */
   def withGap(gap: Expression): SessionWindow = new SessionWindow(gap)
 }
+
+/**
+  * Helper object for creating a over window.
+  */
+object Over {
--- End diff --

I think we need an equivalent class for the Java Table API, similar as 
`org.apache.flink.table.api.java.groupWindows.scala`.


> Integrating the OVER windows in the Table API
> -
>
> Key: FLINK-6228
> URL: https://issues.apache.org/jira/browse/FLINK-6228
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Syntax:
> {code}
> table
>.overWindows(
> (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy 
> order_by_expression] 
>   (preceding  
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW)
>  [following 
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW]
> as alias,...[n])
>)
>   .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n])
> {code}
> Implement restrictions:
> * All OVER clauses in the same SELECT clause must be exactly the same.
> * The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> * The ORDER BY Before the 
> [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884]  implementation 
> orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for 
> batch).
> * FOLLOWING is not supported.
> User interface design document [See | 
> https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977650#comment-15977650
 ] 

ASF GitHub Bot commented on FLINK-6228:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112460885
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
 ---
@@ -216,6 +216,26 @@ object ProjectionTranslator {
 projectList += unresolved
 }
 
+  case OverCall(agg, aggAlias, alias, _) =>
+val overWindow = overWindows.find(_.alias.equals(alias))
+val aggName = agg.child.asInstanceOf[UnresolvedFieldReference].name
+val childrenOutput = parent.output
+val candidates = 
childrenOutput.filter(_.name.equalsIgnoreCase(aggName))
+
+val resolvedFieldReference = if (candidates.length > 1) {
--- End diff --

the other expressions are resolved later. We should follow this pattern to 
the the code base consistent.


> Integrating the OVER windows in the Table API
> -
>
> Key: FLINK-6228
> URL: https://issues.apache.org/jira/browse/FLINK-6228
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Syntax:
> {code}
> table
>.overWindows(
> (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy 
> order_by_expression] 
>   (preceding  
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW)
>  [following 
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW]
> as alias,...[n])
>)
>   .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n])
> {code}
> Implement restrictions:
> * All OVER clauses in the same SELECT clause must be exactly the same.
> * The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> * The ORDER BY Before the 
> [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884]  implementation 
> orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for 
> batch).
> * FOLLOWING is not supported.
> User interface design document [See | 
> https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

2017-04-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112471307
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
 ---
@@ -216,6 +216,26 @@ object ProjectionTranslator {
 projectList += unresolved
 }
 
+  case OverCall(agg, aggAlias, alias, _) =>
+val overWindow = overWindows.find(_.alias.equals(alias))
+val aggName = agg.child.asInstanceOf[UnresolvedFieldReference].name
+val childrenOutput = parent.output
+val candidates = 
childrenOutput.filter(_.name.equalsIgnoreCase(aggName))
+
+val resolvedFieldReference = if (candidates.length > 1) {
--- End diff --

The aggregations are automatically resolved and validated if we add `agg` 
to the children of `OverCall`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977663#comment-15977663
 ] 

ASF GitHub Bot commented on FLINK-6228:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112494282
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
 ---
@@ -49,6 +57,128 @@ case class Call(functionName: String, args: 
Seq[Expression]) extends Expression
 }
 
 /**
+  * Over expression for calcite over transform.
+  *
+  * @param agg over-agg expression
+  * @param aggAliasagg alias for following `select()` clause.
+  * @param overWindowAlias over window alias
+  * @param overWindow  over window
+  */
+case class OverCall(
+agg: Aggregation,
+var aggAlias: Expression,
+overWindowAlias: Expression,
+var overWindow: OverWindow = null) extends Expression {
+
+  private[flink] def as(aggAlias: Expression): OverCall = {
--- End diff --

`OverCall` should not implement its own `as` method. We should use the 
regular `Alias` expression for renaming expressions.


> Integrating the OVER windows in the Table API
> -
>
> Key: FLINK-6228
> URL: https://issues.apache.org/jira/browse/FLINK-6228
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Syntax:
> {code}
> table
>.overWindows(
> (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy 
> order_by_expression] 
>   (preceding  
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW)
>  [following 
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW]
> as alias,...[n])
>)
>   .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n])
> {code}
> Implement restrictions:
> * All OVER clauses in the same SELECT clause must be exactly the same.
> * The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> * The ORDER BY Before the 
> [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884]  implementation 
> orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for 
> batch).
> * FOLLOWING is not supported.
> User interface design document [See | 
> https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977682#comment-15977682
 ] 

ASF GitHub Bot commented on FLINK-6228:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112571486
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala
 ---
@@ -22,6 +22,126 @@ import org.apache.flink.table.expressions.{Expression, 
ExpressionParser}
 import org.apache.flink.table.plan.logical._
 
 /**
+  * An over window specification.
+  *
+  * Over window is similar to the traditional OVER SQL.
+  */
+class OverWindow {
+
+  private[flink] var alias: Expression = _
+  private[flink] var partitionBy: Seq[Expression] = Seq[Expression]()
+  private[flink] var orderBy: Expression = _
+  private[flink] var preceding: Expression = _
+  private[flink] var following: Expression = null
+
+  /**
+* Assigns an alias for this window that the following `select()` 
clause can refer to.
+*
+* @param alias alias for this over window
+* @return this over window
+*/
+  def as(alias: String): OverWindow = 
as(ExpressionParser.parseExpression(alias))
+
+  /**
+* Assigns an alias for this window that the following `select()` 
clause can refer to.
+*
+* @param alias alias for this over window
+* @return this over window
+*/
+  def as(alias: Expression): OverWindow = {
+this.alias = alias
+this
+  }
+
+  /**
+* Partitions the elements on some partition keys.
+*
+* @param partitionBy
+* @return this over window
+*/
+  def partitionBy(partitionBy: String): OverWindow = {
+this.partitionBy(ExpressionParser.parseExpression(partitionBy))
+  }
+
+  /**
+* Partitions the elements on some partition keys.
+*
+* @param partitionBy
+* @return this over window
+*/
+  def partitionBy(partitionBy: Expression*): OverWindow = {
+this.partitionBy = partitionBy
+this
+  }
+
+
+  /**
+* Specifies the time mode.
+*
+* @param orderBy For streaming tables call [[orderBy 'rowtime or 
orderBy 'proctime]]
+*to specify time mode.
+* @return this over window
+*/
+  def orderBy(orderBy: String): OverWindow = {
+this.orderBy(ExpressionParser.parseExpression(orderBy))
+  }
+
+  /**
+* Specifies the time mode.
+*
+* @param orderBy For streaming tables call [[orderBy 'rowtime or 
orderBy 'proctime]]
+*to specify time mode.
+* @return this over window
+*/
+  def orderBy(orderBy: Expression): OverWindow = {
+this.orderBy = orderBy
+this
+  }
+
+  /**
+* Set the preceding offset (based on time or row-count intervals) for 
over window
+*
+* @param preceding forward offset that relative to the current row
+* @return this over window
+*/
+  def preceding(preceding: String): OverWindow = {
+this.preceding(ExpressionParser.parseExpression(preceding))
+  }
+
+  /**
+* Set the preceding offset (based on time or row-count intervals) for 
over window
+*
+* @param preceding forward offset that relative to the current row
+* @return this over window
+*/
+  def preceding(preceding: Expression): OverWindow = {
+this.preceding = preceding
+this
+  }
+
+  /**
+* Set the following offset (based on time or row-count intervals) for 
over window
+*
+* @param following subsequent offset that relative to the current row
+* @return this over window
+*/
+  def following(following: String): OverWindow = {
--- End diff --

Should we make `following` optional and default to CURRENT_ROW / 
CURRENT_RANGE depending on the type of `preceding`? I think that would be a 
nice shortcut and also be aligned with SQL.

What do you think @sunjincheng121 ?


> Integrating the OVER windows in the Table API
> -
>
> Key: FLINK-6228
> URL: https://issues.apache.org/jira/browse/FLINK-6228
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Syntax:
> {code}
> table
>.overWindows(
> (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy 
> order_by_expression] 
>   (preceding  
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW)
> 

[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977670#comment-15977670
 ] 

ASF GitHub Bot commented on FLINK-6228:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112494372
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
 ---
@@ -49,6 +57,128 @@ case class Call(functionName: String, args: 
Seq[Expression]) extends Expression
 }
 
 /**
+  * Over expression for calcite over transform.
+  *
+  * @param agg over-agg expression
+  * @param aggAliasagg alias for following `select()` clause.
+  * @param overWindowAlias over window alias
+  * @param overWindow  over window
+  */
+case class OverCall(
+agg: Aggregation,
+var aggAlias: Expression,
--- End diff --

The `aggAlias` should be removed.


> Integrating the OVER windows in the Table API
> -
>
> Key: FLINK-6228
> URL: https://issues.apache.org/jira/browse/FLINK-6228
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Syntax:
> {code}
> table
>.overWindows(
> (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy 
> order_by_expression] 
>   (preceding  
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW)
>  [following 
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW]
> as alias,...[n])
>)
>   .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n])
> {code}
> Implement restrictions:
> * All OVER clauses in the same SELECT clause must be exactly the same.
> * The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> * The ORDER BY Before the 
> [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884]  implementation 
> orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for 
> batch).
> * FOLLOWING is not supported.
> User interface design document [See | 
> https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

2017-04-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112460318
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
 ---
@@ -216,6 +216,26 @@ object ProjectionTranslator {
 projectList += unresolved
 }
 
+  case OverCall(agg, aggAlias, alias, _) =>
+val overWindow = overWindows.find(_.alias.equals(alias))
+val aggName = agg.child.asInstanceOf[UnresolvedFieldReference].name
+val childrenOutput = parent.output
--- End diff --

`childOutput` or `parentOutput`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

2017-04-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112494282
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
 ---
@@ -49,6 +57,128 @@ case class Call(functionName: String, args: 
Seq[Expression]) extends Expression
 }
 
 /**
+  * Over expression for calcite over transform.
+  *
+  * @param agg over-agg expression
+  * @param aggAliasagg alias for following `select()` clause.
+  * @param overWindowAlias over window alias
+  * @param overWindow  over window
+  */
+case class OverCall(
+agg: Aggregation,
+var aggAlias: Expression,
+overWindowAlias: Expression,
+var overWindow: OverWindow = null) extends Expression {
+
+  private[flink] def as(aggAlias: Expression): OverCall = {
--- End diff --

`OverCall` should not implement its own `as` method. We should use the 
regular `Alias` expression for renaming expressions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

2017-04-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112567381
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
 ---
@@ -49,6 +57,128 @@ case class Call(functionName: String, args: 
Seq[Expression]) extends Expression
 }
 
 /**
+  * Over expression for calcite over transform.
+  *
+  * @param agg over-agg expression
+  * @param aggAliasagg alias for following `select()` clause.
+  * @param overWindowAlias over window alias
+  * @param overWindow  over window
+  */
+case class OverCall(
+agg: Aggregation,
+var aggAlias: Expression,
+overWindowAlias: Expression,
+var overWindow: OverWindow = null) extends Expression {
+
+  private[flink] def as(aggAlias: Expression): OverCall = {
+this.aggAlias = aggAlias
+this
+  }
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+
+val rexBuilder = relBuilder.getRexBuilder
+
+val operator: SqlAggFunction = agg.toSqlAggFunction()
+
+val aggReturnType: TypeInformation[_] = agg.resultType
+
+val relDataType = 
SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType)
+
+val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
+val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name
+
+aggExprs.add(relBuilder.field(aggChildName))
+
+val orderKeys: ImmutableList.Builder[RexFieldCollation] =
+  new ImmutableList.Builder[RexFieldCollation]()
+
+val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]()
+val orderName = 
overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
+
+val rexNode =
+  if (orderName.equalsIgnoreCase("rowtime") || 
orderName.equalsIgnoreCase("proctime")) {
+// for stream
+relBuilder.literal(orderName)
+  } else {
+// for batch
+relBuilder.field(orderName)
+  }
+
+orderKeys.add(new RexFieldCollation(rexNode,sets))
+
+val partitionKeys: util.ArrayList[RexNode] = new 
util.ArrayList[RexNode]()
+overWindow.partitionBy.foreach(x=>
+  
partitionKeys.add(relBuilder.field(x.asInstanceOf[UnresolvedFieldReference].name)))
+
+val preceding = overWindow.preceding.asInstanceOf[Literal]
+val following = overWindow.following.asInstanceOf[Literal]
+
+val isPhysical: Boolean = 
preceding.resultType.isInstanceOf[RowIntervalTypeInfo]
+
+val lowerBound = createBound(relBuilder, 
preceding.value.asInstanceOf[Long], SqlKind.PRECEDING)
+val upperBound = createBound(relBuilder, 
following.value.asInstanceOf[Long], SqlKind.FOLLOWING)
+
+rexBuilder.makeOver(
+  relDataType,
+  operator,
+  aggExprs,
+  partitionKeys,
+  orderKeys.build,
+  lowerBound,
+  upperBound,
+  isPhysical,
+  true,
+  false)
+  }
+
+  private def createBound(
+relBuilder: RelBuilder,
+precedingValue: Long,
--- End diff --

`precedingValue` -> `value`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977656#comment-15977656
 ] 

ASF GitHub Bot commented on FLINK-6228:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112479103
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 ---
@@ -586,6 +602,13 @@ trait ImplicitExpressionOperations {
  * to [[ImplicitExpressionOperations]].
  */
 trait ImplicitExpressionConversions {
+
+  implicit val UNBOUNDED_ROW = toRowInterval(Long.MaxValue)
+  implicit val UNBOUNDED_RANGE = toMilliInterval(1, Long.MaxValue)
+
+  implicit val CURRENT_ROW = toRowInterval(0L)
+  implicit val CURRENT_RANGE = toMilliInterval(0L, Long.MaxValue)
--- End diff --

isn't a range interval of `0` a valid range to use? I would mean all rows 
that arrived in the same millisecond as the current row, no?
Should we use `-1` as constant for current row instead?


> Integrating the OVER windows in the Table API
> -
>
> Key: FLINK-6228
> URL: https://issues.apache.org/jira/browse/FLINK-6228
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Syntax:
> {code}
> table
>.overWindows(
> (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy 
> order_by_expression] 
>   (preceding  
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW)
>  [following 
> UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW]
> as alias,...[n])
>)
>   .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n])
> {code}
> Implement restrictions:
> * All OVER clauses in the same SELECT clause must be exactly the same.
> * The PARTITION BY clause is optional (no partitioning results in single 
> threaded execution).
> * The ORDER BY Before the 
> [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884]  implementation 
> orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for 
> batch).
> * FOLLOWING is not supported.
> User interface design document [See | 
> https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

2017-04-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112570409
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala
 ---
@@ -23,7 +23,9 @@ import org.apache.calcite.rel.`type`.{RelDataType, 
RelDataTypeFieldImpl}
 import org.apache.calcite.rel.core.AggregateCall
 import org.apache.calcite.rel.core.Window.Group
 import org.apache.calcite.rel.core.Window
-import org.apache.calcite.rex.{RexInputRef}
+import org.apache.calcite.rex.{RexInputRef, RexLiteral}
--- End diff --

All these changes could be undone if we don't use a literal for the 
orderBy() expression


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API

2017-04-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977666#comment-15977666
 ] 

ASF GitHub Bot commented on FLINK-6228:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112469357
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
 ---
@@ -49,6 +57,128 @@ case class Call(functionName: String, args: 
Seq[Expression]) extends Expression
 }
 
 /**
+  * Over expression for calcite over transform.
+  *
+  * @param agg over-agg expression
+  * @param aggAliasagg alias for following `select()` clause.
+  * @param overWindowAlias over window alias
+  * @param overWindow  over window
+  */
+case class OverCall(
+agg: Aggregation,
+var aggAlias: Expression,
+overWindowAlias: Expression,
+var overWindow: OverWindow = null) extends Expression {
+
+  private[flink] def as(aggAlias: Expression): OverCall = {
+this.aggAlias = aggAlias
+this
+  }
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+
+val rexBuilder = relBuilder.getRexBuilder
+
+val operator: SqlAggFunction = agg.toSqlAggFunction()
+
+val aggReturnType: TypeInformation[_] = agg.resultType
+
+val relDataType = 
SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType)
+
+val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
+val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name
+
+aggExprs.add(relBuilder.field(aggChildName))
+
+val orderKeys: ImmutableList.Builder[RexFieldCollation] =
+  new ImmutableList.Builder[RexFieldCollation]()
+
+val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]()
+val orderName = 
overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
+
+val rexNode =
+  if (orderName.equalsIgnoreCase("rowtime") || 
orderName.equalsIgnoreCase("proctime")) {
+// for stream
+relBuilder.literal(orderName)
+  } else {
+// for batch
+relBuilder.field(orderName)
+  }
+
+orderKeys.add(new RexFieldCollation(rexNode,sets))
+
+val partitionKeys: util.ArrayList[RexNode] = new 
util.ArrayList[RexNode]()
+overWindow.partitionBy.foreach(x=>
+  
partitionKeys.add(relBuilder.field(x.asInstanceOf[UnresolvedFieldReference].name)))
+
+val preceding = overWindow.preceding.asInstanceOf[Literal]
+val following = overWindow.following.asInstanceOf[Literal]
+
+val isPhysical: Boolean = 
preceding.resultType.isInstanceOf[RowIntervalTypeInfo]
+
+val lowerBound = createBound(relBuilder, 
preceding.value.asInstanceOf[Long], SqlKind.PRECEDING)
+val upperBound = createBound(relBuilder, 
following.value.asInstanceOf[Long], SqlKind.FOLLOWING)
+
+rexBuilder.makeOver(
+  relDataType,
+  operator,
+  aggExprs,
+  partitionKeys,
+  orderKeys.build,
+  lowerBound,
+  upperBound,
+  isPhysical,
+  true,
+  false)
+  }
+
+  private def createBound(
+relBuilder: RelBuilder,
+precedingValue: Long,
+sqlKind: SqlKind): RexWindowBound = {
+
+if (precedingValue == Long.MaxValue) {
+  val unbounded = SqlWindow.createUnboundedPreceding(SqlParserPos.ZERO)
+  create(unbounded, null)
+} else if (precedingValue == 0L) {
+  val currentRow = SqlWindow.createCurrentRow(SqlParserPos.ZERO)
+  create(currentRow, null)
+} else {
+
+  val returnType = new BasicSqlType(
+relBuilder.getTypeFactory.getTypeSystem,
+SqlTypeName.DECIMAL)
+
+  val sqlOperator = new SqlPostfixOperator(
+sqlKind.name,
+sqlKind,
+2,
+new OrdinalReturnTypeInference(0),
+null,
+null)
+
+  val operands: Array[SqlNode] = new Array[SqlNode](1)
+  operands(0) = (SqlLiteral.createExactNumeric("1", SqlParserPos.ZERO))
+
+  val node = new SqlBasicCall(sqlOperator, operands, SqlParserPos.ZERO)
+
+  val expressions: util.ArrayList[RexNode] = new 
util.ArrayList[RexNode]()
+  expressions.add(relBuilder.literal(precedingValue))
+
+  val rexNode = relBuilder.getRexBuilder.makeCall(returnType, 
sqlOperator, expressions)
+
+  create(node, rexNode)
+}
+  }
+
+  override private[flink] def children: Seq[Expression] = Seq()
--- End diff --

change to `Seq(agg)` to automatically 

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

2017-04-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112570917
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala
 ---
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import 
org.apache.flink.table.api.scala.stream.table.OverWindowITCase.{RowTimeSourceFunction}
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, 
StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+class OverWindowITCase extends StreamingWithStateTestBase {
+
+  @Test
+  def testProcTimeUnBoundedPartitionedRowOver(): Unit = {
+
+val data = List(
+  (1L, 1, "Hello"),
+  (2L, 2, "Hello"),
+  (3L, 3, "Hello"),
+  (4L, 4, "Hello"),
+  (5L, 5, "Hello"),
+  (6L, 6, "Hello"),
+  (7L, 7, "Hello World"),
+  (8L, 8, "Hello World"),
+  (20L, 20, "Hello World"))
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setParallelism(1)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.testResults = mutable.MutableList()
+StreamITCase.clear
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'a, 'b, 'c)
+
+val windowedTable = table
+  .window(
+Over partitionBy 'c orderBy 'procTime preceding UNBOUNDED_ROW 
following CURRENT_ROW as 'w)
+  .select('c, 'b.count over 'w as 'mycount)
+  .select('c, 'mycount)
+
+val results = windowedTable.toDataStream[Row]
+results.addSink(new StreamITCase.StringSink)
+env.execute()
+
+val expected = Seq(
+  "Hello World,1", "Hello World,2", "Hello World,3",
+  "Hello,1", "Hello,2", "Hello,3", "Hello,4", "Hello,5", "Hello,6")
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testRowTimeUnBoundedPartitionedRangeOver(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+env.setStateBackend(getStateBackend)
+StreamITCase.testResults = mutable.MutableList()
+StreamITCase.clear
+env.setParallelism(1)
+
+val data = Seq(
+  Left(1405L, (1, 1L, "Hi")),
+  Left(1400L, (2, 1L, "Hello")),
+  Left(1402L, (1, 1L, "Hello")),
+  Left(1402L, (1, 2L, "Hello")),
+  Left(1402L, (1, 3L, "Hello world")),
+  Left(1403L, (2, 2L, "Hello world")),
+  Left(1403L, (2, 3L, "Hello world")),
+  Right(1420L),
+  Left(1421L, (1, 4L, "Hello world")),
+  Left(1422L, (1, 5L, "Hello world")),
+  Left(1422L, (1, 6L, "Hello world")),
+  Left(1422L, (1, 7L, "Hello world")),
+  Left(1423L, (2, 4L, "Hello world")),
+  Left(1423L, (2, 5L, "Hello world")),
+  Right(1430L)
+)
+val table = env
+  .addSource(new RowTimeSourceFunction[(Int, Long, String)](data))
+  .toTable(tEnv).as('a, 'b, 'c)
+
+val windowedTable = table
+  .window(Over partitionBy 'a orderBy 

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

2017-04-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112499806
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -93,28 +96,43 @@ class DataStreamOverAggregate(
 
 val orderKeys = overWindow.orderKeys.getFieldCollations
 
-if (orderKeys.size() != 1) {
-  throw new TableException(
-"Unsupported use of OVER windows. The window can only be ordered 
by a single time column.")
-}
-val orderKey = orderKeys.get(0)
+val timeType = if (!orderKeys.isEmpty) {
+  if (orderKeys.size() != 1) {
+throw new TableException(
+  "Unsupported use of OVER windows. The window can only be ordered 
by a single time " +
+"column.")
+  }
+  val orderKey = orderKeys.get(0)
 
-if (!orderKey.direction.equals(ASCENDING)) {
-  throw new TableException(
-"Unsupported use of OVER windows. The window can only be ordered 
in ASCENDING mode.")
+  if (!orderKey.direction.equals(ASCENDING)) {
+throw new TableException(
+  "Unsupported use of OVER windows. The window can only be ordered 
in ASCENDING mode.")
+  }
+  inputType
+.getFieldList
+.get(orderKey.getFieldIndex)
+.getValue.asInstanceOf[TimeModeType]
+} else {
+  val it = logicWindow.constants.listIterator()
+  if (it.hasNext) {
+val item = it.next().getValue
+if (item.isInstanceOf[NlsString]) {
+  val value = item.asInstanceOf[NlsString].getValue
+  if (value.equalsIgnoreCase("rowtime")) {
+new RowTimeType
+  } else {
+new ProcTimeType
+  }
+}
+  }
 }
 
 val inputDS = 
input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
 
 val generator = new CodeGenerator(
-  tableEnv.getConfig,
-  false,
-  inputDS.getType)
-
-val timeType = inputType
-  .getFieldList
-  .get(orderKey.getFieldIndex)
-  .getValue
+tableEnv.getConfig,
--- End diff --

indent


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

2017-04-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112555311
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala
 ---
@@ -83,3 +83,49 @@ object Session {
 */
   def withGap(gap: Expression): SessionWindow = new SessionWindow(gap)
 }
+
+/**
+  * Helper object for creating a over window.
+  */
+object Over {
--- End diff --

I think we need an equivalent class for the Java Table API, similar as 
`org.apache.flink.table.api.java.groupWindows.scala`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

2017-04-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112570230
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 ---
@@ -93,28 +96,43 @@ class DataStreamOverAggregate(
 
 val orderKeys = overWindow.orderKeys.getFieldCollations
 
-if (orderKeys.size() != 1) {
-  throw new TableException(
-"Unsupported use of OVER windows. The window can only be ordered 
by a single time column.")
-}
-val orderKey = orderKeys.get(0)
+val timeType = if (!orderKeys.isEmpty) {
+  if (orderKeys.size() != 1) {
+throw new TableException(
+  "Unsupported use of OVER windows. The window can only be ordered 
by a single time " +
+"column.")
+  }
+  val orderKey = orderKeys.get(0)
 
-if (!orderKey.direction.equals(ASCENDING)) {
-  throw new TableException(
-"Unsupported use of OVER windows. The window can only be ordered 
in ASCENDING mode.")
+  if (!orderKey.direction.equals(ASCENDING)) {
+throw new TableException(
+  "Unsupported use of OVER windows. The window can only be ordered 
in ASCENDING mode.")
+  }
+  inputType
+.getFieldList
+.get(orderKey.getFieldIndex)
+.getValue.asInstanceOf[TimeModeType]
+} else {
--- End diff --

this could be removed if we don't use literals for the orderBy() field


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

2017-04-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r11245
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ---
@@ -810,6 +810,47 @@ class Table(
 new WindowedTable(this, window)
   }
 
+  /**
--- End diff --

Extend docs to

```
  /**
* Defines over-windows on the records of a table.
*
* An over-window defines for each record an interval of records over 
which aggregation 
* functions can be computed.
*
* Example:
*
* {{{
*   table
* .window(Over partitionBy 'c orderBy 'rowTime preceding 10.seconds 
as 'ow)
* .select('c, 'b.count over 'ow, 'e.sum over 'ow)
* }}}
*
* __Note__: Computing over window aggregates on a streaming table is 
only a parallel operation
* if the window is partititioned. Otherwise, the whole stream will be 
processed by a single
* task, i.e., with parallelism 1.
*
* __Note__: Over-windows for batch tables are currently not supported.
*
* @param overWindows windows that specify the record interval over 
which aggregations are
*computed.
* @return An OverWindowedTable to specify the aggregations.
*/
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

2017-04-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112473915
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
 ---
@@ -810,6 +810,47 @@ class Table(
 new WindowedTable(this, window)
   }
 
+  /**
+* Groups the records of a table by assigning them to windows defined 
by a time or row interval.
+*
+* For streaming tables of infinite size, grouping into windows is 
required to define finite
+* groups on which over-based aggregates can be computed.
+*
+* Over window for batch tables are currently not supported.
+*
+* @param overWindows windows that specifies how elements are grouped.
+* @return Over windowed table
+*/
+  def window(overWindows: OverWindow*): OverWindowedTable = {
+
+if (tableEnv.isInstanceOf[BatchTableEnvironment]) {
+  throw TableException("Over window for batch tables are currently not 
supported.")
+} else {
+  overWindows.foreach { overWindow =>
+val orderName = 
overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
--- End diff --

This check should be done in `OverCall.validateInput()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

2017-04-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112557404
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
 ---
@@ -49,6 +57,128 @@ case class Call(functionName: String, args: 
Seq[Expression]) extends Expression
 }
 
 /**
+  * Over expression for calcite over transform.
+  *
+  * @param agg over-agg expression
+  * @param aggAliasagg alias for following `select()` clause.
+  * @param overWindowAlias over window alias
+  * @param overWindow  over window
+  */
+case class OverCall(
+agg: Aggregation,
+var aggAlias: Expression,
+overWindowAlias: Expression,
+var overWindow: OverWindow = null) extends Expression {
+
+  private[flink] def as(aggAlias: Expression): OverCall = {
+this.aggAlias = aggAlias
+this
+  }
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+
+val rexBuilder = relBuilder.getRexBuilder
+
+val operator: SqlAggFunction = agg.toSqlAggFunction()
+
+val aggReturnType: TypeInformation[_] = agg.resultType
+
+val relDataType = 
SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType)
+
+val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
+val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name
+
+aggExprs.add(relBuilder.field(aggChildName))
+
+val orderKeys: ImmutableList.Builder[RexFieldCollation] =
+  new ImmutableList.Builder[RexFieldCollation]()
+
+val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]()
--- End diff --

+space `val sets: util.HashSet`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

2017-04-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112570095
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
 ---
@@ -49,6 +57,128 @@ case class Call(functionName: String, args: 
Seq[Expression]) extends Expression
 }
 
 /**
+  * Over expression for calcite over transform.
+  *
+  * @param agg over-agg expression
+  * @param aggAliasagg alias for following `select()` clause.
+  * @param overWindowAlias over window alias
+  * @param overWindow  over window
+  */
+case class OverCall(
+agg: Aggregation,
+var aggAlias: Expression,
+overWindowAlias: Expression,
+var overWindow: OverWindow = null) extends Expression {
+
+  private[flink] def as(aggAlias: Expression): OverCall = {
+this.aggAlias = aggAlias
+this
+  }
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+
+val rexBuilder = relBuilder.getRexBuilder
+
+val operator: SqlAggFunction = agg.toSqlAggFunction()
+
+val aggReturnType: TypeInformation[_] = agg.resultType
+
+val relDataType = 
SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType)
+
+val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
+val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name
+
+aggExprs.add(relBuilder.field(aggChildName))
+
+val orderKeys: ImmutableList.Builder[RexFieldCollation] =
+  new ImmutableList.Builder[RexFieldCollation]()
+
+val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]()
+val orderName = 
overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
+
+val rexNode =
+  if (orderName.equalsIgnoreCase("rowtime") || 
orderName.equalsIgnoreCase("proctime")) {
+// for stream
+relBuilder.literal(orderName)
+  } else {
+// for batch
+relBuilder.field(orderName)
+  }
+
+orderKeys.add(new RexFieldCollation(rexNode,sets))
+
+val partitionKeys: util.ArrayList[RexNode] = new 
util.ArrayList[RexNode]()
+overWindow.partitionBy.foreach(x=>
+  
partitionKeys.add(relBuilder.field(x.asInstanceOf[UnresolvedFieldReference].name)))
+
+val preceding = overWindow.preceding.asInstanceOf[Literal]
+val following = overWindow.following.asInstanceOf[Literal]
+
+val isPhysical: Boolean = 
preceding.resultType.isInstanceOf[RowIntervalTypeInfo]
+
+val lowerBound = createBound(relBuilder, 
preceding.value.asInstanceOf[Long], SqlKind.PRECEDING)
+val upperBound = createBound(relBuilder, 
following.value.asInstanceOf[Long], SqlKind.FOLLOWING)
+
+rexBuilder.makeOver(
+  relDataType,
+  operator,
+  aggExprs,
+  partitionKeys,
+  orderKeys.build,
+  lowerBound,
+  upperBound,
+  isPhysical,
+  true,
+  false)
+  }
+
+  private def createBound(
+relBuilder: RelBuilder,
+precedingValue: Long,
+sqlKind: SqlKind): RexWindowBound = {
+
+if (precedingValue == Long.MaxValue) {
+  val unbounded = SqlWindow.createUnboundedPreceding(SqlParserPos.ZERO)
+  create(unbounded, null)
+} else if (precedingValue == 0L) {
+  val currentRow = SqlWindow.createCurrentRow(SqlParserPos.ZERO)
+  create(currentRow, null)
+} else {
+
+  val returnType = new BasicSqlType(
+relBuilder.getTypeFactory.getTypeSystem,
+SqlTypeName.DECIMAL)
+
+  val sqlOperator = new SqlPostfixOperator(
+sqlKind.name,
+sqlKind,
+2,
+new OrdinalReturnTypeInference(0),
+null,
+null)
+
+  val operands: Array[SqlNode] = new Array[SqlNode](1)
+  operands(0) = (SqlLiteral.createExactNumeric("1", SqlParserPos.ZERO))
+
+  val node = new SqlBasicCall(sqlOperator, operands, SqlParserPos.ZERO)
+
+  val expressions: util.ArrayList[RexNode] = new 
util.ArrayList[RexNode]()
+  expressions.add(relBuilder.literal(precedingValue))
+
+  val rexNode = relBuilder.getRexBuilder.makeCall(returnType, 
sqlOperator, expressions)
+
+  create(node, rexNode)
+}
+  }
+
+  override private[flink] def children: Seq[Expression] = Seq()
--- End diff --

maybe we can check the partitionBy expressions here as well?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not 

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

2017-04-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112495783
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
 ---
@@ -54,6 +54,9 @@ case class Project(projectList: Seq[NamedExpression], 
child: LogicalNode) extend
 case expr if !expr.valid => u
 case c @ Cast(ne: NamedExpression, tp) => Alias(c, 
s"${ne.name}-$tp")
 case gcf: GetCompositeField => Alias(gcf, 
gcf.aliasName().getOrElse(s"_c$i"))
+case over: OverCall if null != over.aggAlias =>
--- End diff --

We don't need this if we handle the aggregation alias as regular `Alias` 
expression


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

2017-04-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112457606
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
 ---
@@ -190,8 +190,8 @@ object ProjectionTranslator {
   def expandProjectList(
   exprs: Seq[Expression],
   parent: LogicalNode,
-  tableEnv: TableEnvironment)
-: Seq[Expression] = {
+  tableEnv: TableEnvironment,
+  overWindows: OverWindow*): Seq[Expression] = {
--- End diff --

Please use `Array` instead of varargs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

2017-04-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112495160
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
 ---
@@ -49,6 +57,128 @@ case class Call(functionName: String, args: 
Seq[Expression]) extends Expression
 }
 
 /**
+  * Over expression for calcite over transform.
+  *
+  * @param agg over-agg expression
+  * @param aggAliasagg alias for following `select()` clause.
+  * @param overWindowAlias over window alias
+  * @param overWindow  over window
+  */
+case class OverCall(
+agg: Aggregation,
+var aggAlias: Expression,
+overWindowAlias: Expression,
+var overWindow: OverWindow = null) extends Expression {
+
+  private[flink] def as(aggAlias: Expression): OverCall = {
--- End diff --

The proper translation can be done in 
`ProjectTranslator.translateOverWindow()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

2017-04-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112501222
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/SqlTypeUtils.scala
 ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.typeutils
+
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.Types._
+import org.apache.flink.table.api.TableException
+
+object SqlTypeUtils {
--- End diff --

Util class can be removed. We can create a `RelDataType` from a 
`TypeInformation` as follows:

```

relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory].createTypeFromTypeInfo(typeInfo)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

2017-04-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112469357
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
 ---
@@ -49,6 +57,128 @@ case class Call(functionName: String, args: 
Seq[Expression]) extends Expression
 }
 
 /**
+  * Over expression for calcite over transform.
+  *
+  * @param agg over-agg expression
+  * @param aggAliasagg alias for following `select()` clause.
+  * @param overWindowAlias over window alias
+  * @param overWindow  over window
+  */
+case class OverCall(
+agg: Aggregation,
+var aggAlias: Expression,
+overWindowAlias: Expression,
+var overWindow: OverWindow = null) extends Expression {
+
+  private[flink] def as(aggAlias: Expression): OverCall = {
+this.aggAlias = aggAlias
+this
+  }
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+
+val rexBuilder = relBuilder.getRexBuilder
+
+val operator: SqlAggFunction = agg.toSqlAggFunction()
+
+val aggReturnType: TypeInformation[_] = agg.resultType
+
+val relDataType = 
SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType)
+
+val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
+val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name
+
+aggExprs.add(relBuilder.field(aggChildName))
+
+val orderKeys: ImmutableList.Builder[RexFieldCollation] =
+  new ImmutableList.Builder[RexFieldCollation]()
+
+val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]()
+val orderName = 
overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
+
+val rexNode =
+  if (orderName.equalsIgnoreCase("rowtime") || 
orderName.equalsIgnoreCase("proctime")) {
+// for stream
+relBuilder.literal(orderName)
+  } else {
+// for batch
+relBuilder.field(orderName)
+  }
+
+orderKeys.add(new RexFieldCollation(rexNode,sets))
+
+val partitionKeys: util.ArrayList[RexNode] = new 
util.ArrayList[RexNode]()
+overWindow.partitionBy.foreach(x=>
+  
partitionKeys.add(relBuilder.field(x.asInstanceOf[UnresolvedFieldReference].name)))
+
+val preceding = overWindow.preceding.asInstanceOf[Literal]
+val following = overWindow.following.asInstanceOf[Literal]
+
+val isPhysical: Boolean = 
preceding.resultType.isInstanceOf[RowIntervalTypeInfo]
+
+val lowerBound = createBound(relBuilder, 
preceding.value.asInstanceOf[Long], SqlKind.PRECEDING)
+val upperBound = createBound(relBuilder, 
following.value.asInstanceOf[Long], SqlKind.FOLLOWING)
+
+rexBuilder.makeOver(
+  relDataType,
+  operator,
+  aggExprs,
+  partitionKeys,
+  orderKeys.build,
+  lowerBound,
+  upperBound,
+  isPhysical,
+  true,
+  false)
+  }
+
+  private def createBound(
+relBuilder: RelBuilder,
+precedingValue: Long,
+sqlKind: SqlKind): RexWindowBound = {
+
+if (precedingValue == Long.MaxValue) {
+  val unbounded = SqlWindow.createUnboundedPreceding(SqlParserPos.ZERO)
+  create(unbounded, null)
+} else if (precedingValue == 0L) {
+  val currentRow = SqlWindow.createCurrentRow(SqlParserPos.ZERO)
+  create(currentRow, null)
+} else {
+
+  val returnType = new BasicSqlType(
+relBuilder.getTypeFactory.getTypeSystem,
+SqlTypeName.DECIMAL)
+
+  val sqlOperator = new SqlPostfixOperator(
+sqlKind.name,
+sqlKind,
+2,
+new OrdinalReturnTypeInference(0),
+null,
+null)
+
+  val operands: Array[SqlNode] = new Array[SqlNode](1)
+  operands(0) = (SqlLiteral.createExactNumeric("1", SqlParserPos.ZERO))
+
+  val node = new SqlBasicCall(sqlOperator, operands, SqlParserPos.ZERO)
+
+  val expressions: util.ArrayList[RexNode] = new 
util.ArrayList[RexNode]()
+  expressions.add(relBuilder.literal(precedingValue))
+
+  val rexNode = relBuilder.getRexBuilder.makeCall(returnType, 
sqlOperator, expressions)
+
+  create(node, rexNode)
+}
+  }
+
+  override private[flink] def children: Seq[Expression] = Seq()
--- End diff --

change to `Seq(agg)` to automatically validate the aggregation call and its 
arguments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the 

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

2017-04-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112488004
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
 ---
@@ -35,6 +37,18 @@ abstract sealed class Aggregation extends 
UnaryExpression {
 * Convert Aggregate to its counterpart in Calcite, i.e. AggCall
 */
   private[flink] def toAggCall(name: String)(implicit relBuilder: 
RelBuilder): AggCall
+
+  /**
+* Because SqlAggFunction from Calcite's AggCallImpl is invisible,
--- End diff --

Thanks for the explanation for this method.
Can you update the doc to `Returns the SqlAggFunction for this Aggregation` 
and change the method name to `getSqlAggFunction()`? The method does not 
convert the Aggregation because the returned SqlAggFunction does not contain 
the arguments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

2017-04-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112564205
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
 ---
@@ -49,6 +57,128 @@ case class Call(functionName: String, args: 
Seq[Expression]) extends Expression
 }
 
 /**
+  * Over expression for calcite over transform.
+  *
+  * @param agg over-agg expression
+  * @param aggAliasagg alias for following `select()` clause.
+  * @param overWindowAlias over window alias
+  * @param overWindow  over window
+  */
+case class OverCall(
--- End diff --

We also need to validate other properties of the overWindow:
- check that the partitionBy expressions are valid fields in the input. 
- If a partitionBy expression is not a field, we would need to push the 
expression into a Project before the Project with the overWindow and reference 
the new field. This would need to happen in `Table.window()`. I think for now 
we can have the restriction that only field references are allowed expressions.
- validate the `preceding` and `following` are literals @


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

2017-04-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112561580
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
 ---
@@ -49,6 +57,128 @@ case class Call(functionName: String, args: 
Seq[Expression]) extends Expression
 }
 
 /**
+  * Over expression for calcite over transform.
+  *
+  * @param agg over-agg expression
+  * @param aggAliasagg alias for following `select()` clause.
+  * @param overWindowAlias over window alias
+  * @param overWindow  over window
+  */
+case class OverCall(
+agg: Aggregation,
+var aggAlias: Expression,
+overWindowAlias: Expression,
+var overWindow: OverWindow = null) extends Expression {
+
+  private[flink] def as(aggAlias: Expression): OverCall = {
+this.aggAlias = aggAlias
+this
+  }
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+
+val rexBuilder = relBuilder.getRexBuilder
+
+val operator: SqlAggFunction = agg.toSqlAggFunction()
+
+val aggReturnType: TypeInformation[_] = agg.resultType
+
+val relDataType = 
SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType)
+
+val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
+val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name
+
+aggExprs.add(relBuilder.field(aggChildName))
+
+val orderKeys: ImmutableList.Builder[RexFieldCollation] =
+  new ImmutableList.Builder[RexFieldCollation]()
+
+val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]()
+val orderName = 
overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
+
+val rexNode =
+  if (orderName.equalsIgnoreCase("rowtime") || 
orderName.equalsIgnoreCase("proctime")) {
+// for stream
+relBuilder.literal(orderName)
+  } else {
+// for batch
+relBuilder.field(orderName)
+  }
+
+orderKeys.add(new RexFieldCollation(rexNode,sets))
+
+val partitionKeys: util.ArrayList[RexNode] = new 
util.ArrayList[RexNode]()
+overWindow.partitionBy.foreach(x=>
--- End diff --

+spacea `foreach( x =>`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

2017-04-20 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3743#discussion_r112567644
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
 ---
@@ -49,6 +57,128 @@ case class Call(functionName: String, args: 
Seq[Expression]) extends Expression
 }
 
 /**
+  * Over expression for calcite over transform.
+  *
+  * @param agg over-agg expression
+  * @param aggAliasagg alias for following `select()` clause.
+  * @param overWindowAlias over window alias
+  * @param overWindow  over window
+  */
+case class OverCall(
+agg: Aggregation,
+var aggAlias: Expression,
+overWindowAlias: Expression,
+var overWindow: OverWindow = null) extends Expression {
+
+  private[flink] def as(aggAlias: Expression): OverCall = {
+this.aggAlias = aggAlias
+this
+  }
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+
+val rexBuilder = relBuilder.getRexBuilder
+
+val operator: SqlAggFunction = agg.toSqlAggFunction()
+
+val aggReturnType: TypeInformation[_] = agg.resultType
+
+val relDataType = 
SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType)
+
+val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
+val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name
+
+aggExprs.add(relBuilder.field(aggChildName))
+
+val orderKeys: ImmutableList.Builder[RexFieldCollation] =
+  new ImmutableList.Builder[RexFieldCollation]()
+
+val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]()
+val orderName = 
overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
+
+val rexNode =
+  if (orderName.equalsIgnoreCase("rowtime") || 
orderName.equalsIgnoreCase("proctime")) {
+// for stream
+relBuilder.literal(orderName)
+  } else {
+// for batch
+relBuilder.field(orderName)
+  }
+
+orderKeys.add(new RexFieldCollation(rexNode,sets))
+
+val partitionKeys: util.ArrayList[RexNode] = new 
util.ArrayList[RexNode]()
+overWindow.partitionBy.foreach(x=>
+  
partitionKeys.add(relBuilder.field(x.asInstanceOf[UnresolvedFieldReference].name)))
+
+val preceding = overWindow.preceding.asInstanceOf[Literal]
+val following = overWindow.following.asInstanceOf[Literal]
+
+val isPhysical: Boolean = 
preceding.resultType.isInstanceOf[RowIntervalTypeInfo]
+
+val lowerBound = createBound(relBuilder, 
preceding.value.asInstanceOf[Long], SqlKind.PRECEDING)
+val upperBound = createBound(relBuilder, 
following.value.asInstanceOf[Long], SqlKind.FOLLOWING)
+
+rexBuilder.makeOver(
+  relDataType,
+  operator,
+  aggExprs,
+  partitionKeys,
+  orderKeys.build,
+  lowerBound,
+  upperBound,
+  isPhysical,
+  true,
+  false)
+  }
+
+  private def createBound(
+relBuilder: RelBuilder,
+precedingValue: Long,
+sqlKind: SqlKind): RexWindowBound = {
+
+if (precedingValue == Long.MaxValue) {
--- End diff --

Please use the constants defined in `expressionDsl.scala` for the checks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   >