[jira] [Commented] (FLINK-5095) Add explicit notifyOfAddedX methods to MetricReporter interface
[ https://issues.apache.org/jira/browse/FLINK-5095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15978103#comment-15978103 ] Chesnay Schepler commented on FLINK-5095: - It's less maintainable. By introducing a set of generic primitives we can add more metric types without having to change all reporters. Take a timer for example. If we introduced that now not a single existing reporter could report it and would have to be updated. In contrast, if the timer would just be another generic "NumberMetric" they would be instantly supported. Furthermore, we would no longer have the generic Gauge type which is just a pain to deal with; if you can't deal with non-numeric gauges you cannot just ignore them in notify; after all a metric might return null and you can't check the type of that. You can only do that (somewhat reliably) in report(). > Add explicit notifyOfAddedX methods to MetricReporter interface > --- > > Key: FLINK-5095 > URL: https://issues.apache.org/jira/browse/FLINK-5095 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.1.3 >Reporter: Chesnay Schepler >Priority: Minor > > I would like to start a discussion on the MetricReporter interface, > specifically the methods that notify a reporter of added or removed metrics. > Currently, the methods are defined as follows: > {code} > void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group); > void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup > group); > {code} > All metrics, regardless of their actual type, are passed to the reporter with > these methods. > Since the different metric types have to be handled differently we thus force > every reporter to do something like this: > {code} > if (metric instanceof Counter) { > Counter c = (Counter) metric; > // deal with counter > } else if (metric instanceof Gauge) { > // deal with gauge > } else if (metric instanceof Histogram) { > // deal with histogram > } else if (metric instanceof Meter) { > // deal with meter > } else { > // log something or throw an exception > } > {code} > This has a few issues > * the instanceof checks and castings are unnecessary overhead > * it requires the implementer to be aware of every metric type > * it encourages throwing an exception in the final else block > We could remedy all of these by reworking the interface to contain explicit > add/remove methods for every metric type. This would however be a breaking > change and blow up the interface to 12 methods from the current 4. We could > also add a RichMetricReporter interface with these methods, which would > require relatively little changes but add additional complexity. > I was wondering what other people think about this. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-6351) Refactoring YarnFlinkApplicationMasterRunner by combining AbstractYarnFlinkApplicationMasterRunner in one class
[ https://issues.apache.org/jira/browse/FLINK-6351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977998#comment-15977998 ] mingleizhang edited comment on FLINK-6351 at 4/21/17 3:10 AM: -- [~till.rohrmann] Hi, Till. Please check it out and how do you think of this ? If there is no more question, I will work on this soon. Thanks go out to you. was (Author: mingleizhang): [~till.rohrmann] Hi, Till. Please check it out and how do you think of this ? Thanks go out to you. > Refactoring YarnFlinkApplicationMasterRunner by combining > AbstractYarnFlinkApplicationMasterRunner in one class > --- > > Key: FLINK-6351 > URL: https://issues.apache.org/jira/browse/FLINK-6351 > Project: Flink > Issue Type: Improvement > Components: YARN >Reporter: mingleizhang >Assignee: mingleizhang > > The currently YarnFlinkApplicationMasterRunner inherits from > AbstractYarnFlinkApplicationMasterRunner and seems a bit unnecessary. We > could conbine two classes and then instantiate the services and runtime > components in the constructor of YarnFlinkApplicationMasterRunner for geting > rid of the lock in run method. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6351) Refactoring YarnFlinkApplicationMasterRunner by combining AbstractYarnFlinkApplicationMasterRunner in one class
[ https://issues.apache.org/jira/browse/FLINK-6351?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977998#comment-15977998 ] mingleizhang commented on FLINK-6351: - [~till.rohrmann] Hi, Till. Please check it out and how do you think of this ? Thanks go out to you. > Refactoring YarnFlinkApplicationMasterRunner by combining > AbstractYarnFlinkApplicationMasterRunner in one class > --- > > Key: FLINK-6351 > URL: https://issues.apache.org/jira/browse/FLINK-6351 > Project: Flink > Issue Type: Improvement > Components: YARN >Reporter: mingleizhang >Assignee: mingleizhang > > The currently YarnFlinkApplicationMasterRunner inherits from > AbstractYarnFlinkApplicationMasterRunner and seems a bit unnecessary. We > could conbine two classes and then instantiate the services and runtime > components in the constructor of YarnFlinkApplicationMasterRunner for geting > rid of the lock in run method. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6351) Refactoring YarnFlinkApplicationMasterRunner by combining AbstractYarnFlinkApplicationMasterRunner in one class
[ https://issues.apache.org/jira/browse/FLINK-6351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mingleizhang updated FLINK-6351: Description: The currently YarnFlinkApplicationMasterRunner inherits from AbstractYarnFlinkApplicationMasterRunner and seems a bit unnecessary. We could conbine two classes and then instantiate the services and runtime components in the constructor of YarnFlinkApplicationMasterRunner for geting rid of the lock in run method. > Refactoring YarnFlinkApplicationMasterRunner by combining > AbstractYarnFlinkApplicationMasterRunner in one class > --- > > Key: FLINK-6351 > URL: https://issues.apache.org/jira/browse/FLINK-6351 > Project: Flink > Issue Type: Improvement > Components: YARN >Reporter: mingleizhang >Assignee: mingleizhang > > The currently YarnFlinkApplicationMasterRunner inherits from > AbstractYarnFlinkApplicationMasterRunner and seems a bit unnecessary. We > could conbine two classes and then instantiate the services and runtime > components in the constructor of YarnFlinkApplicationMasterRunner for geting > rid of the lock in run method. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6351) Refactoring YarnFlinkApplicationMasterRunner by combining AbstractYarnFlinkApplicationMasterRunner in one class
[ https://issues.apache.org/jira/browse/FLINK-6351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mingleizhang updated FLINK-6351: Priority: Major (was: Minor) > Refactoring YarnFlinkApplicationMasterRunner by combining > AbstractYarnFlinkApplicationMasterRunner in one class > --- > > Key: FLINK-6351 > URL: https://issues.apache.org/jira/browse/FLINK-6351 > Project: Flink > Issue Type: Improvement > Components: YARN >Reporter: mingleizhang >Assignee: mingleizhang > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6351) Refactoring YarnFlinkApplicationMasterRunner by combining AbstractYarnFlinkApplicationMasterRunner in one class
mingleizhang created FLINK-6351: --- Summary: Refactoring YarnFlinkApplicationMasterRunner by combining AbstractYarnFlinkApplicationMasterRunner in one class Key: FLINK-6351 URL: https://issues.apache.org/jira/browse/FLINK-6351 Project: Flink Issue Type: Improvement Components: YARN Reporter: mingleizhang Assignee: mingleizhang Priority: Minor -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6350) Flink: Windowing does not work with streams from collections and the local execution environment
[ https://issues.apache.org/jira/browse/FLINK-6350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-6350: Component/s: Streaming > Flink: Windowing does not work with streams from collections and the local > execution environment > > > Key: FLINK-6350 > URL: https://issues.apache.org/jira/browse/FLINK-6350 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.2.0 >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.2.2 > > > When using events via the {{fromCollection}} method of > {{StreamExecutionEnvironment}}, window timing is not supported. The time > windows close immediately. > This is unfortunate because mocking events from collections and testing them > locally is a powerful way to unit test stream processors. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6350) Flink: Windowing does not work with streams from collections and the local execution environment
Bowen Li created FLINK-6350: --- Summary: Flink: Windowing does not work with streams from collections and the local execution environment Key: FLINK-6350 URL: https://issues.apache.org/jira/browse/FLINK-6350 Project: Flink Issue Type: Bug Affects Versions: 1.2.0 Reporter: Bowen Li Assignee: Bowen Li Fix For: 1.2.2 When using events via the {{fromCollection}} method of {{StreamExecutionEnvironment}}, window timing is not supported. The time windows close immediately. This is unfortunate because mocking events from collections and testing them locally is a powerful way to unit test stream processors. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-5095) Add explicit notifyOfAddedX methods to MetricReporter interface
[ https://issues.apache.org/jira/browse/FLINK-5095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977893#comment-15977893 ] Bowen Li edited comment on FLINK-5095 at 4/21/17 1:02 AM: -- What's the main disadvantage of having counter, meters, and gauge handled separately besides the increased number of API? was (Author: phoenixjiangnan): What's the main disadvantage of having counter, meters, and gauge separately besides the increased number of API? > Add explicit notifyOfAddedX methods to MetricReporter interface > --- > > Key: FLINK-5095 > URL: https://issues.apache.org/jira/browse/FLINK-5095 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.1.3 >Reporter: Chesnay Schepler >Priority: Minor > > I would like to start a discussion on the MetricReporter interface, > specifically the methods that notify a reporter of added or removed metrics. > Currently, the methods are defined as follows: > {code} > void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group); > void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup > group); > {code} > All metrics, regardless of their actual type, are passed to the reporter with > these methods. > Since the different metric types have to be handled differently we thus force > every reporter to do something like this: > {code} > if (metric instanceof Counter) { > Counter c = (Counter) metric; > // deal with counter > } else if (metric instanceof Gauge) { > // deal with gauge > } else if (metric instanceof Histogram) { > // deal with histogram > } else if (metric instanceof Meter) { > // deal with meter > } else { > // log something or throw an exception > } > {code} > This has a few issues > * the instanceof checks and castings are unnecessary overhead > * it requires the implementer to be aware of every metric type > * it encourages throwing an exception in the final else block > We could remedy all of these by reworking the interface to contain explicit > add/remove methods for every metric type. This would however be a breaking > change and blow up the interface to 12 methods from the current 4. We could > also add a RichMetricReporter interface with these methods, which would > require relatively little changes but add additional complexity. > I was wondering what other people think about this. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5095) Add explicit notifyOfAddedX methods to MetricReporter interface
[ https://issues.apache.org/jira/browse/FLINK-5095?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977893#comment-15977893 ] Bowen Li commented on FLINK-5095: - What's the main disadvantage of having counter, meters, and gauge separately besides the increased number of API? > Add explicit notifyOfAddedX methods to MetricReporter interface > --- > > Key: FLINK-5095 > URL: https://issues.apache.org/jira/browse/FLINK-5095 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.1.3 >Reporter: Chesnay Schepler >Priority: Minor > > I would like to start a discussion on the MetricReporter interface, > specifically the methods that notify a reporter of added or removed metrics. > Currently, the methods are defined as follows: > {code} > void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group); > void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup > group); > {code} > All metrics, regardless of their actual type, are passed to the reporter with > these methods. > Since the different metric types have to be handled differently we thus force > every reporter to do something like this: > {code} > if (metric instanceof Counter) { > Counter c = (Counter) metric; > // deal with counter > } else if (metric instanceof Gauge) { > // deal with gauge > } else if (metric instanceof Histogram) { > // deal with histogram > } else if (metric instanceof Meter) { > // deal with meter > } else { > // log something or throw an exception > } > {code} > This has a few issues > * the instanceof checks and castings are unnecessary overhead > * it requires the implementer to be aware of every metric type > * it encourages throwing an exception in the final else block > We could remedy all of these by reworking the interface to contain explicit > add/remove methods for every metric type. This would however be a breaking > change and blow up the interface to 12 methods from the current 4. We could > also add a RichMetricReporter interface with these methods, which would > require relatively little changes but add additional complexity. > I was wondering what other people think about this. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6309) Memory consumer weights should be calculated in job vertex level
[ https://issues.apache.org/jira/browse/FLINK-6309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977892#comment-15977892 ] Xu Pingyong commented on FLINK-6309: Memory consumer weights should be considered not only in terms of chaining , but also in slot sharing. > Memory consumer weights should be calculated in job vertex level > > > Key: FLINK-6309 > URL: https://issues.apache.org/jira/browse/FLINK-6309 > Project: Flink > Issue Type: Improvement > Components: Optimizer >Reporter: Kurt Young >Assignee: Xu Pingyong > > Currently in {{PlanFinalizer}}, we travel the whole job vertexes to calculate > the memory consumer weights, and then assign the weights for each job vertex. > In a case of a large job graph, e.g. with multiple joins, group reduces, the > consumer weights will be high and the usable memory for each job vertex will > be very low. > I think it makes more sense to calculate the memory consumer weights in job > vertex level (after chaining) to maximize the memory utility. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink
[ https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977891#comment-15977891 ] ASF GitHub Bot commented on FLINK-6225: --- GitHub user PangZhi opened a pull request: https://github.com/apache/flink/pull/3748 [FLINK-6225] [Cassandra Connector] add CassandraTableSink This PR is to address https://issues.apache.org/jira/browse/FLINK-6225 - add Row type support for CassandraSink - add StreamTableSink implementation for Cassandra You can merge this pull request into a Git repository by running: $ git pull https://github.com/PangZhi/flink FLINK-6225_add_cassandra_table_sink Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3748.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3748 commit 8bd2816a5d5999575bd98b5d3096af5f36f6f173 Author: Jing FanDate: 2017-04-21T00:54:36Z [FLINK-6225] [Cassandra Connector] add CassandraTableSink > Support Row Stream for CassandraSink > > > Key: FLINK-6225 > URL: https://issues.apache.org/jira/browse/FLINK-6225 > Project: Flink > Issue Type: New Feature > Components: Cassandra Connector >Affects Versions: 1.3.0 >Reporter: Jing Fan >Assignee: Haohui Mai > Fix For: 1.3.0 > > > Currently in CassandraSink, specifying query is not supported for row-stream. > The solution should be similar to CassandraTupleSink. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...
GitHub user PangZhi opened a pull request: https://github.com/apache/flink/pull/3748 [FLINK-6225] [Cassandra Connector] add CassandraTableSink This PR is to address https://issues.apache.org/jira/browse/FLINK-6225 - add Row type support for CassandraSink - add StreamTableSink implementation for Cassandra You can merge this pull request into a Git repository by running: $ git pull https://github.com/PangZhi/flink FLINK-6225_add_cassandra_table_sink Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3748.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3748 commit 8bd2816a5d5999575bd98b5d3096af5f36f6f173 Author: Jing FanDate: 2017-04-21T00:54:36Z [FLINK-6225] [Cassandra Connector] add CassandraTableSink --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-6349) Enforce per-subtask record ordering on resharding for FlinkKinesisConsumer
Tzu-Li (Gordon) Tai created FLINK-6349: -- Summary: Enforce per-subtask record ordering on resharding for FlinkKinesisConsumer Key: FLINK-6349 URL: https://issues.apache.org/jira/browse/FLINK-6349 Project: Flink Issue Type: Improvement Components: Kinesis Connector Reporter: Tzu-Li (Gordon) Tai As described in FLINK-6316, currently the Kinesis consumer does not provide any ordering guarantees when resharding occurs. While this cannot be enforced globally (i.e. if a merged / split shard's child shard ends up in a different subtask, we cannot do any coordination for ordering guarantee), we can definitely enforce this locally for each subtask. Simply put, we can still locally enforce ordering by making sure that discovered child shards are consumed only after any of its parent shards that were on the same subtask are fully consumed. To do this, we would also need to add "parent shard" information to {{KinesisStreamShard}} (Flink's representation of Kinesis shards). This would be directly beneficial for per-shard watermarks (FLINK-5697) to retain per-shard time characteristics after a reshard, and therefore can be seen as a prerequisite. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6348) Migrate from Java serialization for RollingSink's state
Tzu-Li (Gordon) Tai created FLINK-6348: -- Summary: Migrate from Java serialization for RollingSink's state Key: FLINK-6348 URL: https://issues.apache.org/jira/browse/FLINK-6348 Project: Flink Issue Type: Sub-task Reporter: Tzu-Li (Gordon) Tai See umbrella JIRA FLINK-6343 for details. This subtask tracks the migration for {{RollingSink}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6347) Migrate from Java serialization for MessageAcknowledgingSourceBase's state
Tzu-Li (Gordon) Tai created FLINK-6347: -- Summary: Migrate from Java serialization for MessageAcknowledgingSourceBase's state Key: FLINK-6347 URL: https://issues.apache.org/jira/browse/FLINK-6347 Project: Flink Issue Type: Sub-task Components: Streaming Connectors Reporter: Tzu-Li (Gordon) Tai See umbrella JIRA FLINK-6343 for details. This subtask tracks the migration for {{MessageAcknowledgingSourceBase}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6346) Migrate from Java serialization for GenericWriteAheadSink's state
Tzu-Li (Gordon) Tai created FLINK-6346: -- Summary: Migrate from Java serialization for GenericWriteAheadSink's state Key: FLINK-6346 URL: https://issues.apache.org/jira/browse/FLINK-6346 Project: Flink Issue Type: Sub-task Components: Streaming Connectors Reporter: Tzu-Li (Gordon) Tai See umbrella JIRA FLINK-6343 for details. This subtask tracks the migration for {{GenericWriteAheadSink}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6345) Migrate from Java serialization for ContinuousFileReaderOperator's state
Tzu-Li (Gordon) Tai created FLINK-6345: -- Summary: Migrate from Java serialization for ContinuousFileReaderOperator's state Key: FLINK-6345 URL: https://issues.apache.org/jira/browse/FLINK-6345 Project: Flink Issue Type: Sub-task Components: Streaming Connectors Reporter: Tzu-Li (Gordon) Tai See umbrella JIRA FLINK-6343 for details. This subtask tracks the migration for {{ContinuousFileReaderOperator}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6344) Migrate from Java serialization for BucketingSink's state
[ https://issues.apache.org/jira/browse/FLINK-6344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-6344: --- Summary: Migrate from Java serialization for BucketingSink's state (was: Migrate from Java serialization for `BucketingSink`'s state) > Migrate from Java serialization for BucketingSink's state > - > > Key: FLINK-6344 > URL: https://issues.apache.org/jira/browse/FLINK-6344 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai > > See umbrella JIRA FLINK-6343 for details. This subtask tracks the migration > for `BucketingSink`. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6344) Migrate from Java serialization for `BucketingSink`'s state
Tzu-Li (Gordon) Tai created FLINK-6344: -- Summary: Migrate from Java serialization for `BucketingSink`'s state Key: FLINK-6344 URL: https://issues.apache.org/jira/browse/FLINK-6344 Project: Flink Issue Type: Sub-task Components: Streaming Connectors Reporter: Tzu-Li (Gordon) Tai See umbrella JIRA FLINK-6343 for details. This subtask tracks the migration for `BucketingSink`. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6343) Migrate from discouraged Java serialization for all sources / sinks
[ https://issues.apache.org/jira/browse/FLINK-6343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-6343: --- Description: With FLINK-6324, the Java serialization shortcut for operator state is now deprecated to discourage its usage. These sources / sinks are still using this shortcut, and should be migrated to use {{getListState(descriptor)}} instead: - {{BucketingSink}} - {{ContinuousFileReaderOperator}} - {{GenericWriteAheadSink}} - {{MessageAcknowledgingSourceBase}} - {{RollingSink}} - {{FlinkKafkaConsumerBase}} (will be fixed along with the state migration included with FLINK-4022) To ease review, I propose to open up subtasks under this JIRA and separate PRs for the migration of each single source / sink. was: With FLINK-6324, the Java serialization shortcut for operator state is now deprecated to discourage its usage. These sources / sinks are still using this shortcut, and should be migrated to use `getListState(descriptor)` instead: - {{BucketingSink}} - {{ContinuousFileReaderOperator}} - {{GenericWriteAheadSink}} - {{MessageAcknowledgingSourceBase}} - {{RollingSink}} - {{FlinkKafkaConsumerBase}} (will be fixed along with the state migration included with FLINK-4022) To ease review, I propose to open up subtasks under this JIRA and separate PRs for the migration of each single source / sink. > Migrate from discouraged Java serialization for all sources / sinks > --- > > Key: FLINK-6343 > URL: https://issues.apache.org/jira/browse/FLINK-6343 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai > > With FLINK-6324, the Java serialization shortcut for operator state is now > deprecated to discourage its usage. > These sources / sinks are still using this shortcut, and should be migrated > to use {{getListState(descriptor)}} instead: > - {{BucketingSink}} > - {{ContinuousFileReaderOperator}} > - {{GenericWriteAheadSink}} > - {{MessageAcknowledgingSourceBase}} > - {{RollingSink}} > - {{FlinkKafkaConsumerBase}} (will be fixed along with the state migration > included with FLINK-4022) > To ease review, I propose to open up subtasks under this JIRA and separate > PRs for the migration of each single source / sink. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6343) Migrate from discouraged Java serialization for all sources / sinks
[ https://issues.apache.org/jira/browse/FLINK-6343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-6343: --- Description: With FLINK-6324, the Java serialization shortcut for operator state is now deprecated to discourage its usage. These sources / sinks are still using this shortcut, and should be migrated to use `getListState(descriptor)` instead: - {{BucketingSink}} - {{ContinuousFileReaderOperator}} - {{GenericWriteAheadSink}} - {{MessageAcknowledgingSourceBase}} - {{RollingSink}} - {{FlinkKafkaConsumerBase}} (will be fixed along with the state migration included with FLINK-4022) To ease review, I propose to open up subtasks under this JIRA and separate PRs for the migration of each single source / sink. was: With FLINK-6324, the Java serialization shortcut for operator state is now deprecated to discourage its usage. These sources / sinks are still using this shortcut, and should be migrated to use `getListState(descriptor)` instead: - `BucketingSink` - `ContinuousFileReaderOperator` - `GenericWriteAheadSink` - `MessageAcknowledgingSourceBase` - `RollingSink` - `FlinkKafkaConsumerBase` (will be fixed along with the state migration included with FLINK-4022) To ease review, I propose to open up subtasks under this JIRA and separate PRs for the migration of each single source / sink. > Migrate from discouraged Java serialization for all sources / sinks > --- > > Key: FLINK-6343 > URL: https://issues.apache.org/jira/browse/FLINK-6343 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Reporter: Tzu-Li (Gordon) Tai > > With FLINK-6324, the Java serialization shortcut for operator state is now > deprecated to discourage its usage. > These sources / sinks are still using this shortcut, and should be migrated > to use `getListState(descriptor)` instead: > - {{BucketingSink}} > - {{ContinuousFileReaderOperator}} > - {{GenericWriteAheadSink}} > - {{MessageAcknowledgingSourceBase}} > - {{RollingSink}} > - {{FlinkKafkaConsumerBase}} (will be fixed along with the state migration > included with FLINK-4022) > To ease review, I propose to open up subtasks under this JIRA and separate > PRs for the migration of each single source / sink. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6343) Migrate from discouraged Java serialization for all sources / sinks
Tzu-Li (Gordon) Tai created FLINK-6343: -- Summary: Migrate from discouraged Java serialization for all sources / sinks Key: FLINK-6343 URL: https://issues.apache.org/jira/browse/FLINK-6343 Project: Flink Issue Type: Improvement Components: Streaming Connectors Reporter: Tzu-Li (Gordon) Tai With FLINK-6324, the Java serialization shortcut for operator state is now deprecated to discourage its usage. These sources / sinks are still using this shortcut, and should be migrated to use `getListState(descriptor)` instead: - `BucketingSink` - `ContinuousFileReaderOperator` - `GenericWriteAheadSink` - `MessageAcknowledgingSourceBase` - `RollingSink` - `FlinkKafkaConsumerBase` (will be fixed along with the state migration included with FLINK-4022) To ease review, I propose to open up subtasks under this JIRA and separate PRs for the migration of each single source / sink. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run
[ https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai closed FLINK-6311. -- Resolution: Fixed Fix Version/s: 1.2.2 1.3.0 Resolved for master via http://git-wip-us.apache.org/repos/asf/flink/commit/a0249d9. Resolved for release-1.2 via http://git-wip-us.apache.org/repos/asf/flink/commit/80dc704. > NPE in FlinkKinesisConsumer if source was closed before run > --- > > Key: FLINK-6311 > URL: https://issues.apache.org/jira/browse/FLINK-6311 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: mingleizhang > Fix For: 1.3.0, 1.2.2 > > > This was reported by an user: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html > The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected > against the condition when the source was closed before it started running. > Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6311) NPE in FlinkKinesisConsumer if source was closed before run
[ https://issues.apache.org/jira/browse/FLINK-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977768#comment-15977768 ] ASF GitHub Bot commented on FLINK-6311: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3738 > NPE in FlinkKinesisConsumer if source was closed before run > --- > > Key: FLINK-6311 > URL: https://issues.apache.org/jira/browse/FLINK-6311 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Reporter: Tzu-Li (Gordon) Tai >Assignee: mingleizhang > > This was reported by an user: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-errors-out-and-job-fails-IOException-from-CollectSink-open-td12606.html > The {{shutdownFetcher}} method of {{KinesisDataFetcher}} is not protected > against the condition when the source was closed before it started running. > Both {{mainThread}} and {{shardConsumersExecutor}} should have null checks. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3738: [FLINK-6311] [Kinesis Connector] NPE in FlinkKines...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3738 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6013) Add Datadog HTTP metrics reporter
[ https://issues.apache.org/jira/browse/FLINK-6013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977755#comment-15977755 ] ASF GitHub Bot commented on FLINK-6013: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/3736 Green build! @zentol > Add Datadog HTTP metrics reporter > - > > Key: FLINK-6013 > URL: https://issues.apache.org/jira/browse/FLINK-6013 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.3.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Critical > Fix For: 1.3.0 > > > We at OfferUp use Datadog a lot for metrics and dashboards, and I believe a > lot other companies also do. > Flink right now only has a StatsD metrics reporter, and users have to set up > Datadog Agent in order to receive metrics from StatsD and transport them to > Datadog. We don't like this approach. > We prefer to have a Datadog metrics reporter directly contacting Datadog http > endpoint. > I'll take this ticket myself. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3736: [FLINK-6013][metrics] Add Datadog HTTP metrics reporter
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/3736 Green build! @zentol --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API
[ https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977669#comment-15977669 ] ASF GitHub Bot commented on FLINK-6228: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112488004 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala --- @@ -35,6 +37,18 @@ abstract sealed class Aggregation extends UnaryExpression { * Convert Aggregate to its counterpart in Calcite, i.e. AggCall */ private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall + + /** +* Because SqlAggFunction from Calcite's AggCallImpl is invisible, --- End diff -- Thanks for the explanation for this method. Can you update the doc to `Returns the SqlAggFunction for this Aggregation` and change the method name to `getSqlAggFunction()`? The method does not convert the Aggregation because the returned SqlAggFunction does not contain the arguments. > Integrating the OVER windows in the Table API > - > > Key: FLINK-6228 > URL: https://issues.apache.org/jira/browse/FLINK-6228 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Syntax: > {code} > table >.overWindows( > (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy > order_by_expression] > (preceding > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) > [following > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] > as alias,...[n]) >) > .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n]) > {code} > Implement restrictions: > * All OVER clauses in the same SELECT clause must be exactly the same. > * The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > * The ORDER BY Before the > [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884] implementation > orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for > batch). > * FOLLOWING is not supported. > User interface design document [See | > https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API
[ https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977648#comment-15977648 ] ASF GitHub Bot commented on FLINK-6228: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112456485 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala --- @@ -810,6 +810,47 @@ class Table( new WindowedTable(this, window) } + /** +* Groups the records of a table by assigning them to windows defined by a time or row interval. +* +* For streaming tables of infinite size, grouping into windows is required to define finite +* groups on which over-based aggregates can be computed. +* +* Over window for batch tables are currently not supported. +* +* @param overWindows windows that specifies how elements are grouped. +* @return Over windowed table +*/ + def window(overWindows: OverWindow*): OverWindowedTable = { + +if (tableEnv.isInstanceOf[BatchTableEnvironment]) { + throw TableException("Over window for batch tables are currently not supported.") +} else { + overWindows.foreach { overWindow => +val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name +if (!orderName.equalsIgnoreCase("rowtime") + && !orderName.equalsIgnoreCase("proctime")) { + throw ValidationException( +s"OrderBy expression must be ['rowtime] or ['proctime], but got ['${orderName}]") +} + } +} + +if (overWindows.size != 1) { + throw TableException("OverWindow only supported single window at current time.") +} + +overWindows.foreach { overWindow => + if (!overWindow.preceding.asInstanceOf[Literal].resultType.getClass --- End diff -- This check should be done in `OverCall.validateInput()` as well. > Integrating the OVER windows in the Table API > - > > Key: FLINK-6228 > URL: https://issues.apache.org/jira/browse/FLINK-6228 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Syntax: > {code} > table >.overWindows( > (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy > order_by_expression] > (preceding > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) > [following > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] > as alias,...[n]) >) > .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n]) > {code} > Implement restrictions: > * All OVER clauses in the same SELECT clause must be exactly the same. > * The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > * The ORDER BY Before the > [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884] implementation > orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for > batch). > * FOLLOWING is not supported. > User interface design document [See | > https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API
[ https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977681#comment-15977681 ] ASF GitHub Bot commented on FLINK-6228: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112570095 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala --- @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param aggAliasagg alias for following `select()` clause. + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( +agg: Aggregation, +var aggAlias: Expression, +overWindowAlias: Expression, +var overWindow: OverWindow = null) extends Expression { + + private[flink] def as(aggAlias: Expression): OverCall = { +this.aggAlias = aggAlias +this + } + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + +val rexBuilder = relBuilder.getRexBuilder + +val operator: SqlAggFunction = agg.toSqlAggFunction() + +val aggReturnType: TypeInformation[_] = agg.resultType + +val relDataType = SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType) + +val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]() +val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name + +aggExprs.add(relBuilder.field(aggChildName)) + +val orderKeys: ImmutableList.Builder[RexFieldCollation] = + new ImmutableList.Builder[RexFieldCollation]() + +val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]() +val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name + +val rexNode = + if (orderName.equalsIgnoreCase("rowtime") || orderName.equalsIgnoreCase("proctime")) { +// for stream +relBuilder.literal(orderName) + } else { +// for batch +relBuilder.field(orderName) + } + +orderKeys.add(new RexFieldCollation(rexNode,sets)) + +val partitionKeys: util.ArrayList[RexNode] = new util.ArrayList[RexNode]() +overWindow.partitionBy.foreach(x=> + partitionKeys.add(relBuilder.field(x.asInstanceOf[UnresolvedFieldReference].name))) + +val preceding = overWindow.preceding.asInstanceOf[Literal] +val following = overWindow.following.asInstanceOf[Literal] + +val isPhysical: Boolean = preceding.resultType.isInstanceOf[RowIntervalTypeInfo] + +val lowerBound = createBound(relBuilder, preceding.value.asInstanceOf[Long], SqlKind.PRECEDING) +val upperBound = createBound(relBuilder, following.value.asInstanceOf[Long], SqlKind.FOLLOWING) + +rexBuilder.makeOver( + relDataType, + operator, + aggExprs, + partitionKeys, + orderKeys.build, + lowerBound, + upperBound, + isPhysical, + true, + false) + } + + private def createBound( +relBuilder: RelBuilder, +precedingValue: Long, +sqlKind: SqlKind): RexWindowBound = { + +if (precedingValue == Long.MaxValue) { + val unbounded = SqlWindow.createUnboundedPreceding(SqlParserPos.ZERO) + create(unbounded, null) +} else if (precedingValue == 0L) { + val currentRow = SqlWindow.createCurrentRow(SqlParserPos.ZERO) + create(currentRow, null) +} else { + + val returnType = new BasicSqlType( +relBuilder.getTypeFactory.getTypeSystem, +SqlTypeName.DECIMAL) + + val sqlOperator = new SqlPostfixOperator( +sqlKind.name, +sqlKind, +2, +new OrdinalReturnTypeInference(0), +null, +null) + + val operands: Array[SqlNode] = new Array[SqlNode](1) + operands(0) = (SqlLiteral.createExactNumeric("1", SqlParserPos.ZERO)) + + val node = new SqlBasicCall(sqlOperator, operands, SqlParserPos.ZERO) + + val expressions: util.ArrayList[RexNode] = new util.ArrayList[RexNode]() + expressions.add(relBuilder.literal(precedingValue)) + + val rexNode = relBuilder.getRexBuilder.makeCall(returnType, sqlOperator, expressions) + + create(node, rexNode) +} + } + + override private[flink] def children: Seq[Expression] = Seq() --- End diff -- maybe we can check the partitionBy
[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API
[ https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977653#comment-15977653 ] ASF GitHub Bot commented on FLINK-6228: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112482490 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala --- @@ -22,6 +22,126 @@ import org.apache.flink.table.expressions.{Expression, ExpressionParser} import org.apache.flink.table.plan.logical._ /** + * An over window specification. + * + * Over window is similar to the traditional OVER SQL. + */ +class OverWindow { + + private[flink] var alias: Expression = _ + private[flink] var partitionBy: Seq[Expression] = Seq[Expression]() + private[flink] var orderBy: Expression = _ + private[flink] var preceding: Expression = _ + private[flink] var following: Expression = null + + /** +* Assigns an alias for this window that the following `select()` clause can refer to. +* +* @param alias alias for this over window +* @return this over window +*/ + def as(alias: String): OverWindow = as(ExpressionParser.parseExpression(alias)) + + /** +* Assigns an alias for this window that the following `select()` clause can refer to. +* +* @param alias alias for this over window +* @return this over window +*/ + def as(alias: Expression): OverWindow = { --- End diff -- I think it would be good if we could enforce the alias. This could be done by first build a window which is not of type `OverWindow` and letting `as()` return the complete `OverWindow` > Integrating the OVER windows in the Table API > - > > Key: FLINK-6228 > URL: https://issues.apache.org/jira/browse/FLINK-6228 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Syntax: > {code} > table >.overWindows( > (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy > order_by_expression] > (preceding > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) > [following > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] > as alias,...[n]) >) > .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n]) > {code} > Implement restrictions: > * All OVER clauses in the same SELECT clause must be exactly the same. > * The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > * The ORDER BY Before the > [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884] implementation > orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for > batch). > * FOLLOWING is not supported. > User interface design document [See | > https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API
[ https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977667#comment-15977667 ] ASF GitHub Bot commented on FLINK-6228: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112501222 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/SqlTypeUtils.scala --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.typeutils + +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.Types._ +import org.apache.flink.table.api.TableException + +object SqlTypeUtils { --- End diff -- Util class can be removed. We can create a `RelDataType` from a `TypeInformation` as follows: ``` relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory].createTypeFromTypeInfo(typeInfo) ``` > Integrating the OVER windows in the Table API > - > > Key: FLINK-6228 > URL: https://issues.apache.org/jira/browse/FLINK-6228 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Syntax: > {code} > table >.overWindows( > (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy > order_by_expression] > (preceding > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) > [following > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] > as alias,...[n]) >) > .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n]) > {code} > Implement restrictions: > * All OVER clauses in the same SELECT clause must be exactly the same. > * The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > * The ORDER BY Before the > [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884] implementation > orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for > batch). > * FOLLOWING is not supported. > User interface design document [See | > https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API
[ https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977678#comment-15977678 ] ASF GitHub Bot commented on FLINK-6228: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112561580 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala --- @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param aggAliasagg alias for following `select()` clause. + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( +agg: Aggregation, +var aggAlias: Expression, +overWindowAlias: Expression, +var overWindow: OverWindow = null) extends Expression { + + private[flink] def as(aggAlias: Expression): OverCall = { +this.aggAlias = aggAlias +this + } + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + +val rexBuilder = relBuilder.getRexBuilder + +val operator: SqlAggFunction = agg.toSqlAggFunction() + +val aggReturnType: TypeInformation[_] = agg.resultType + +val relDataType = SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType) + +val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]() +val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name + +aggExprs.add(relBuilder.field(aggChildName)) + +val orderKeys: ImmutableList.Builder[RexFieldCollation] = + new ImmutableList.Builder[RexFieldCollation]() + +val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]() +val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name + +val rexNode = + if (orderName.equalsIgnoreCase("rowtime") || orderName.equalsIgnoreCase("proctime")) { +// for stream +relBuilder.literal(orderName) + } else { +// for batch +relBuilder.field(orderName) + } + +orderKeys.add(new RexFieldCollation(rexNode,sets)) + +val partitionKeys: util.ArrayList[RexNode] = new util.ArrayList[RexNode]() +overWindow.partitionBy.foreach(x=> --- End diff -- +spacea `foreach( x =>` > Integrating the OVER windows in the Table API > - > > Key: FLINK-6228 > URL: https://issues.apache.org/jira/browse/FLINK-6228 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Syntax: > {code} > table >.overWindows( > (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy > order_by_expression] > (preceding > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) > [following > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] > as alias,...[n]) >) > .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n]) > {code} > Implement restrictions: > * All OVER clauses in the same SELECT clause must be exactly the same. > * The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > * The ORDER BY Before the > [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884] implementation > orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for > batch). > * FOLLOWING is not supported. > User interface design document [See | > https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API
[ https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977655#comment-15977655 ] ASF GitHub Bot commented on FLINK-6228: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112471307 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala --- @@ -216,6 +216,26 @@ object ProjectionTranslator { projectList += unresolved } + case OverCall(agg, aggAlias, alias, _) => +val overWindow = overWindows.find(_.alias.equals(alias)) +val aggName = agg.child.asInstanceOf[UnresolvedFieldReference].name +val childrenOutput = parent.output +val candidates = childrenOutput.filter(_.name.equalsIgnoreCase(aggName)) + +val resolvedFieldReference = if (candidates.length > 1) { --- End diff -- The aggregations are automatically resolved and validated if we add `agg` to the children of `OverCall` > Integrating the OVER windows in the Table API > - > > Key: FLINK-6228 > URL: https://issues.apache.org/jira/browse/FLINK-6228 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Syntax: > {code} > table >.overWindows( > (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy > order_by_expression] > (preceding > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) > [following > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] > as alias,...[n]) >) > .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n]) > {code} > Implement restrictions: > * All OVER clauses in the same SELECT clause must be exactly the same. > * The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > * The ORDER BY Before the > [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884] implementation > orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for > batch). > * FOLLOWING is not supported. > User interface design document [See | > https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API
[ https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977661#comment-15977661 ] ASF GitHub Bot commented on FLINK-6228: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112473915 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala --- @@ -810,6 +810,47 @@ class Table( new WindowedTable(this, window) } + /** +* Groups the records of a table by assigning them to windows defined by a time or row interval. +* +* For streaming tables of infinite size, grouping into windows is required to define finite +* groups on which over-based aggregates can be computed. +* +* Over window for batch tables are currently not supported. +* +* @param overWindows windows that specifies how elements are grouped. +* @return Over windowed table +*/ + def window(overWindows: OverWindow*): OverWindowedTable = { + +if (tableEnv.isInstanceOf[BatchTableEnvironment]) { + throw TableException("Over window for batch tables are currently not supported.") +} else { + overWindows.foreach { overWindow => +val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name --- End diff -- This check should be done in `OverCall.validateInput()` > Integrating the OVER windows in the Table API > - > > Key: FLINK-6228 > URL: https://issues.apache.org/jira/browse/FLINK-6228 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Syntax: > {code} > table >.overWindows( > (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy > order_by_expression] > (preceding > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) > [following > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] > as alias,...[n]) >) > .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n]) > {code} > Implement restrictions: > * All OVER clauses in the same SELECT clause must be exactly the same. > * The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > * The ORDER BY Before the > [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884] implementation > orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for > batch). > * FOLLOWING is not supported. > User interface design document [See | > https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API
[ https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977674#comment-15977674 ] ASF GitHub Bot commented on FLINK-6228: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112495783 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala --- @@ -54,6 +54,9 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extend case expr if !expr.valid => u case c @ Cast(ne: NamedExpression, tp) => Alias(c, s"${ne.name}-$tp") case gcf: GetCompositeField => Alias(gcf, gcf.aliasName().getOrElse(s"_c$i")) +case over: OverCall if null != over.aggAlias => --- End diff -- We don't need this if we handle the aggregation alias as regular `Alias` expression > Integrating the OVER windows in the Table API > - > > Key: FLINK-6228 > URL: https://issues.apache.org/jira/browse/FLINK-6228 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Syntax: > {code} > table >.overWindows( > (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy > order_by_expression] > (preceding > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) > [following > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] > as alias,...[n]) >) > .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n]) > {code} > Implement restrictions: > * All OVER clauses in the same SELECT clause must be exactly the same. > * The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > * The ORDER BY Before the > [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884] implementation > orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for > batch). > * FOLLOWING is not supported. > User interface design document [See | > https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API
[ https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977658#comment-15977658 ] ASF GitHub Bot commented on FLINK-6228: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112485134 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala --- @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param aggAliasagg alias for following `select()` clause. + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( --- End diff -- This class should override `checkInputs()` and check that the interval configuration is valid (unbounded - current row, preceding - current row), and the checks which are moved from `table.scala`. > Integrating the OVER windows in the Table API > - > > Key: FLINK-6228 > URL: https://issues.apache.org/jira/browse/FLINK-6228 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Syntax: > {code} > table >.overWindows( > (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy > order_by_expression] > (preceding > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) > [following > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] > as alias,...[n]) >) > .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n]) > {code} > Implement restrictions: > * All OVER clauses in the same SELECT clause must be exactly the same. > * The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > * The ORDER BY Before the > [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884] implementation > orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for > batch). > * FOLLOWING is not supported. > User interface design document [See | > https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API
[ https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977679#comment-15977679 ] ASF GitHub Bot commented on FLINK-6228: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112570230 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -93,28 +96,43 @@ class DataStreamOverAggregate( val orderKeys = overWindow.orderKeys.getFieldCollations -if (orderKeys.size() != 1) { - throw new TableException( -"Unsupported use of OVER windows. The window can only be ordered by a single time column.") -} -val orderKey = orderKeys.get(0) +val timeType = if (!orderKeys.isEmpty) { + if (orderKeys.size() != 1) { +throw new TableException( + "Unsupported use of OVER windows. The window can only be ordered by a single time " + +"column.") + } + val orderKey = orderKeys.get(0) -if (!orderKey.direction.equals(ASCENDING)) { - throw new TableException( -"Unsupported use of OVER windows. The window can only be ordered in ASCENDING mode.") + if (!orderKey.direction.equals(ASCENDING)) { +throw new TableException( + "Unsupported use of OVER windows. The window can only be ordered in ASCENDING mode.") + } + inputType +.getFieldList +.get(orderKey.getFieldIndex) +.getValue.asInstanceOf[TimeModeType] +} else { --- End diff -- this could be removed if we don't use literals for the orderBy() field > Integrating the OVER windows in the Table API > - > > Key: FLINK-6228 > URL: https://issues.apache.org/jira/browse/FLINK-6228 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Syntax: > {code} > table >.overWindows( > (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy > order_by_expression] > (preceding > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) > [following > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] > as alias,...[n]) >) > .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n]) > {code} > Implement restrictions: > * All OVER clauses in the same SELECT clause must be exactly the same. > * The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > * The ORDER BY Before the > [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884] implementation > orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for > batch). > * FOLLOWING is not supported. > User interface design document [See | > https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API
[ https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977680#comment-15977680 ] ASF GitHub Bot commented on FLINK-6228: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112557404 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala --- @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param aggAliasagg alias for following `select()` clause. + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( +agg: Aggregation, +var aggAlias: Expression, +overWindowAlias: Expression, +var overWindow: OverWindow = null) extends Expression { + + private[flink] def as(aggAlias: Expression): OverCall = { +this.aggAlias = aggAlias +this + } + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + +val rexBuilder = relBuilder.getRexBuilder + +val operator: SqlAggFunction = agg.toSqlAggFunction() + +val aggReturnType: TypeInformation[_] = agg.resultType + +val relDataType = SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType) + +val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]() +val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name + +aggExprs.add(relBuilder.field(aggChildName)) + +val orderKeys: ImmutableList.Builder[RexFieldCollation] = + new ImmutableList.Builder[RexFieldCollation]() + +val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]() --- End diff -- +space `val sets: util.HashSet` > Integrating the OVER windows in the Table API > - > > Key: FLINK-6228 > URL: https://issues.apache.org/jira/browse/FLINK-6228 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Syntax: > {code} > table >.overWindows( > (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy > order_by_expression] > (preceding > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) > [following > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] > as alias,...[n]) >) > .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n]) > {code} > Implement restrictions: > * All OVER clauses in the same SELECT clause must be exactly the same. > * The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > * The ORDER BY Before the > [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884] implementation > orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for > batch). > * FOLLOWING is not supported. > User interface design document [See | > https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API
[ https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977662#comment-15977662 ] ASF GitHub Bot commented on FLINK-6228: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112461145 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala --- @@ -216,6 +216,26 @@ object ProjectionTranslator { projectList += unresolved } + case OverCall(agg, aggAlias, alias, _) => +val overWindow = overWindows.find(_.alias.equals(alias)) +val aggName = agg.child.asInstanceOf[UnresolvedFieldReference].name +val childrenOutput = parent.output +val candidates = childrenOutput.filter(_.name.equalsIgnoreCase(aggName)) + +val resolvedFieldReference = if (candidates.length > 1) { + throw new TableException(s"Reference $aggName is ambiguous.") +} else if (candidates.isEmpty) { + throw new TableException(s"Can not resolve [$aggName].") +} else { + Some(candidates.head.withName(aggName)) +} + +projectList += new OverCall( --- End diff -- remove `new` > Integrating the OVER windows in the Table API > - > > Key: FLINK-6228 > URL: https://issues.apache.org/jira/browse/FLINK-6228 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Syntax: > {code} > table >.overWindows( > (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy > order_by_expression] > (preceding > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) > [following > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] > as alias,...[n]) >) > .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n]) > {code} > Implement restrictions: > * All OVER clauses in the same SELECT clause must be exactly the same. > * The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > * The ORDER BY Before the > [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884] implementation > orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for > batch). > * FOLLOWING is not supported. > User interface design document [See | > https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API
[ https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977657#comment-15977657 ] ASF GitHub Bot commented on FLINK-6228: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112469838 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala --- @@ -35,6 +37,18 @@ abstract sealed class Aggregation extends UnaryExpression { * Convert Aggregate to its counterpart in Calcite, i.e. AggCall */ private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall + + /** +* Because SqlAggFunction from Calcite's AggCallImpl is invisible, +* we have to manually create sqlAggFunction in flink code base. +* +*/ + private[flink] def toSqlAggFunction()(implicit relBuilder: RelBuilder): SqlAggFunction + + /** +* Attach the Resolved Child to aggregation +*/ + private[flink] def withResolvedChild(child: Expression): Aggregation --- End diff -- remove the `withResolvedChild()` method as we validate the aggregation argument with the default validation. > Integrating the OVER windows in the Table API > - > > Key: FLINK-6228 > URL: https://issues.apache.org/jira/browse/FLINK-6228 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Syntax: > {code} > table >.overWindows( > (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy > order_by_expression] > (preceding > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) > [following > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] > as alias,...[n]) >) > .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n]) > {code} > Implement restrictions: > * All OVER clauses in the same SELECT clause must be exactly the same. > * The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > * The ORDER BY Before the > [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884] implementation > orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for > batch). > * FOLLOWING is not supported. > User interface design document [See | > https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API
[ https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977675#comment-15977675 ] ASF GitHub Bot commented on FLINK-6228: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112567644 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala --- @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param aggAliasagg alias for following `select()` clause. + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( +agg: Aggregation, +var aggAlias: Expression, +overWindowAlias: Expression, +var overWindow: OverWindow = null) extends Expression { + + private[flink] def as(aggAlias: Expression): OverCall = { +this.aggAlias = aggAlias +this + } + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + +val rexBuilder = relBuilder.getRexBuilder + +val operator: SqlAggFunction = agg.toSqlAggFunction() + +val aggReturnType: TypeInformation[_] = agg.resultType + +val relDataType = SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType) + +val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]() +val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name + +aggExprs.add(relBuilder.field(aggChildName)) + +val orderKeys: ImmutableList.Builder[RexFieldCollation] = + new ImmutableList.Builder[RexFieldCollation]() + +val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]() +val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name + +val rexNode = + if (orderName.equalsIgnoreCase("rowtime") || orderName.equalsIgnoreCase("proctime")) { +// for stream +relBuilder.literal(orderName) + } else { +// for batch +relBuilder.field(orderName) + } + +orderKeys.add(new RexFieldCollation(rexNode,sets)) + +val partitionKeys: util.ArrayList[RexNode] = new util.ArrayList[RexNode]() +overWindow.partitionBy.foreach(x=> + partitionKeys.add(relBuilder.field(x.asInstanceOf[UnresolvedFieldReference].name))) + +val preceding = overWindow.preceding.asInstanceOf[Literal] +val following = overWindow.following.asInstanceOf[Literal] + +val isPhysical: Boolean = preceding.resultType.isInstanceOf[RowIntervalTypeInfo] + +val lowerBound = createBound(relBuilder, preceding.value.asInstanceOf[Long], SqlKind.PRECEDING) +val upperBound = createBound(relBuilder, following.value.asInstanceOf[Long], SqlKind.FOLLOWING) + +rexBuilder.makeOver( + relDataType, + operator, + aggExprs, + partitionKeys, + orderKeys.build, + lowerBound, + upperBound, + isPhysical, + true, + false) + } + + private def createBound( +relBuilder: RelBuilder, +precedingValue: Long, +sqlKind: SqlKind): RexWindowBound = { + +if (precedingValue == Long.MaxValue) { --- End diff -- Please use the constants defined in `expressionDsl.scala` for the checks > Integrating the OVER windows in the Table API > - > > Key: FLINK-6228 > URL: https://issues.apache.org/jira/browse/FLINK-6228 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Syntax: > {code} > table >.overWindows( > (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy > order_by_expression] > (preceding > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) > [following > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] > as alias,...[n]) >) > .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n]) > {code} > Implement restrictions: > * All OVER clauses in the same SELECT clause must be exactly the same. > * The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > * The ORDER BY Before the > [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884] implementation > orderBy may only have ‘rowtime/’proctime(for
[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API
[ https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977659#comment-15977659 ] ASF GitHub Bot commented on FLINK-6228: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r11245 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala --- @@ -810,6 +810,47 @@ class Table( new WindowedTable(this, window) } + /** --- End diff -- Extend docs to ``` /** * Defines over-windows on the records of a table. * * An over-window defines for each record an interval of records over which aggregation * functions can be computed. * * Example: * * {{{ * table * .window(Over partitionBy 'c orderBy 'rowTime preceding 10.seconds as 'ow) * .select('c, 'b.count over 'ow, 'e.sum over 'ow) * }}} * * __Note__: Computing over window aggregates on a streaming table is only a parallel operation * if the window is partititioned. Otherwise, the whole stream will be processed by a single * task, i.e., with parallelism 1. * * __Note__: Over-windows for batch tables are currently not supported. * * @param overWindows windows that specify the record interval over which aggregations are *computed. * @return An OverWindowedTable to specify the aggregations. */ ``` > Integrating the OVER windows in the Table API > - > > Key: FLINK-6228 > URL: https://issues.apache.org/jira/browse/FLINK-6228 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Syntax: > {code} > table >.overWindows( > (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy > order_by_expression] > (preceding > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) > [following > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] > as alias,...[n]) >) > .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n]) > {code} > Implement restrictions: > * All OVER clauses in the same SELECT clause must be exactly the same. > * The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > * The ORDER BY Before the > [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884] implementation > orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for > batch). > * FOLLOWING is not supported. > User interface design document [See | > https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API
[ https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977660#comment-15977660 ] ASF GitHub Bot commented on FLINK-6228: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112456740 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala --- @@ -928,6 +969,27 @@ class WindowedTable( } +class OverWindowedTable( +private[flink] val table: Table, +private[flink] val overWindows: OverWindow*) { --- End diff -- Please use `Array` instead of varargs for internal methods & classes. > Integrating the OVER windows in the Table API > - > > Key: FLINK-6228 > URL: https://issues.apache.org/jira/browse/FLINK-6228 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Syntax: > {code} > table >.overWindows( > (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy > order_by_expression] > (preceding > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) > [following > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] > as alias,...[n]) >) > .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n]) > {code} > Implement restrictions: > * All OVER clauses in the same SELECT clause must be exactly the same. > * The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > * The ORDER BY Before the > [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884] implementation > orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for > batch). > * FOLLOWING is not supported. > User interface design document [See | > https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API
[ https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977665#comment-15977665 ] ASF GitHub Bot commented on FLINK-6228: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112456197 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala --- @@ -810,6 +810,47 @@ class Table( new WindowedTable(this, window) } + /** +* Groups the records of a table by assigning them to windows defined by a time or row interval. +* +* For streaming tables of infinite size, grouping into windows is required to define finite +* groups on which over-based aggregates can be computed. +* +* Over window for batch tables are currently not supported. +* +* @param overWindows windows that specifies how elements are grouped. +* @return Over windowed table +*/ + def window(overWindows: OverWindow*): OverWindowedTable = { + +if (tableEnv.isInstanceOf[BatchTableEnvironment]) { + throw TableException("Over window for batch tables are currently not supported.") +} else { + overWindows.foreach { overWindow => +val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name --- End diff -- The other operators are validated later. Can you add this check to This check to `OverCall.validateInput()`? Please add tests to validate that the checks work. > Integrating the OVER windows in the Table API > - > > Key: FLINK-6228 > URL: https://issues.apache.org/jira/browse/FLINK-6228 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Syntax: > {code} > table >.overWindows( > (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy > order_by_expression] > (preceding > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) > [following > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] > as alias,...[n]) >) > .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n]) > {code} > Implement restrictions: > * All OVER clauses in the same SELECT clause must be exactly the same. > * The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > * The ORDER BY Before the > [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884] implementation > orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for > batch). > * FOLLOWING is not supported. > User interface design document [See | > https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API
[ https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977671#comment-15977671 ] ASF GitHub Bot commented on FLINK-6228: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112567381 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala --- @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param aggAliasagg alias for following `select()` clause. + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( +agg: Aggregation, +var aggAlias: Expression, +overWindowAlias: Expression, +var overWindow: OverWindow = null) extends Expression { + + private[flink] def as(aggAlias: Expression): OverCall = { +this.aggAlias = aggAlias +this + } + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + +val rexBuilder = relBuilder.getRexBuilder + +val operator: SqlAggFunction = agg.toSqlAggFunction() + +val aggReturnType: TypeInformation[_] = agg.resultType + +val relDataType = SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType) + +val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]() +val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name + +aggExprs.add(relBuilder.field(aggChildName)) + +val orderKeys: ImmutableList.Builder[RexFieldCollation] = + new ImmutableList.Builder[RexFieldCollation]() + +val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]() +val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name + +val rexNode = + if (orderName.equalsIgnoreCase("rowtime") || orderName.equalsIgnoreCase("proctime")) { +// for stream +relBuilder.literal(orderName) + } else { +// for batch +relBuilder.field(orderName) + } + +orderKeys.add(new RexFieldCollation(rexNode,sets)) + +val partitionKeys: util.ArrayList[RexNode] = new util.ArrayList[RexNode]() +overWindow.partitionBy.foreach(x=> + partitionKeys.add(relBuilder.field(x.asInstanceOf[UnresolvedFieldReference].name))) + +val preceding = overWindow.preceding.asInstanceOf[Literal] +val following = overWindow.following.asInstanceOf[Literal] + +val isPhysical: Boolean = preceding.resultType.isInstanceOf[RowIntervalTypeInfo] + +val lowerBound = createBound(relBuilder, preceding.value.asInstanceOf[Long], SqlKind.PRECEDING) +val upperBound = createBound(relBuilder, following.value.asInstanceOf[Long], SqlKind.FOLLOWING) + +rexBuilder.makeOver( + relDataType, + operator, + aggExprs, + partitionKeys, + orderKeys.build, + lowerBound, + upperBound, + isPhysical, + true, + false) + } + + private def createBound( +relBuilder: RelBuilder, +precedingValue: Long, --- End diff -- `precedingValue` -> `value` > Integrating the OVER windows in the Table API > - > > Key: FLINK-6228 > URL: https://issues.apache.org/jira/browse/FLINK-6228 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Syntax: > {code} > table >.overWindows( > (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy > order_by_expression] > (preceding > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) > [following > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] > as alias,...[n]) >) > .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n]) > {code} > Implement restrictions: > * All OVER clauses in the same SELECT clause must be exactly the same. > * The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > * The ORDER BY Before the > [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884] implementation > orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for > batch). > * FOLLOWING is not supported. > User interface design document [See | >
[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API
[ https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977654#comment-15977654 ] ASF GitHub Bot commented on FLINK-6228: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112476982 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala --- @@ -364,6 +365,21 @@ trait ImplicitExpressionOperations { def position(haystack: Expression) = Position(expr, haystack) /** +* For windowing function to config over window +* e.g.: +* table +* .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows following CURRENT_ROW as 'w) +* .select('c, 'a, 'a.count over 'w, 'a.sum over 'w) +*/ + def over(alias: Expression) = { +expr match { + case _: Aggregation => new OverCall(expr.asInstanceOf[Aggregation], null, alias) --- End diff -- rm `new` (case classes should be instantiated with the `apply()` method, i.e., without `new`) > Integrating the OVER windows in the Table API > - > > Key: FLINK-6228 > URL: https://issues.apache.org/jira/browse/FLINK-6228 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Syntax: > {code} > table >.overWindows( > (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy > order_by_expression] > (preceding > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) > [following > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] > as alias,...[n]) >) > .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n]) > {code} > Implement restrictions: > * All OVER clauses in the same SELECT clause must be exactly the same. > * The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > * The ORDER BY Before the > [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884] implementation > orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for > batch). > * FOLLOWING is not supported. > User interface design document [See | > https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API
[ https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977677#comment-15977677 ] ASF GitHub Bot commented on FLINK-6228: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112570409 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala --- @@ -23,7 +23,9 @@ import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFieldImpl} import org.apache.calcite.rel.core.AggregateCall import org.apache.calcite.rel.core.Window.Group import org.apache.calcite.rel.core.Window -import org.apache.calcite.rex.{RexInputRef} +import org.apache.calcite.rex.{RexInputRef, RexLiteral} --- End diff -- All these changes could be undone if we don't use a literal for the orderBy() expression > Integrating the OVER windows in the Table API > - > > Key: FLINK-6228 > URL: https://issues.apache.org/jira/browse/FLINK-6228 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Syntax: > {code} > table >.overWindows( > (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy > order_by_expression] > (preceding > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) > [following > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] > as alias,...[n]) >) > .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n]) > {code} > Implement restrictions: > * All OVER clauses in the same SELECT clause must be exactly the same. > * The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > * The ORDER BY Before the > [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884] implementation > orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for > batch). > * FOLLOWING is not supported. > User interface design document [See | > https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112461145 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala --- @@ -216,6 +216,26 @@ object ProjectionTranslator { projectList += unresolved } + case OverCall(agg, aggAlias, alias, _) => +val overWindow = overWindows.find(_.alias.equals(alias)) +val aggName = agg.child.asInstanceOf[UnresolvedFieldReference].name +val childrenOutput = parent.output +val candidates = childrenOutput.filter(_.name.equalsIgnoreCase(aggName)) + +val resolvedFieldReference = if (candidates.length > 1) { + throw new TableException(s"Reference $aggName is ambiguous.") +} else if (candidates.isEmpty) { + throw new TableException(s"Can not resolve [$aggName].") +} else { + Some(candidates.head.withName(aggName)) +} + +projectList += new OverCall( --- End diff -- remove `new` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112560976 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala --- @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param aggAliasagg alias for following `select()` clause. + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( +agg: Aggregation, +var aggAlias: Expression, +overWindowAlias: Expression, +var overWindow: OverWindow = null) extends Expression { + + private[flink] def as(aggAlias: Expression): OverCall = { +this.aggAlias = aggAlias +this + } + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + +val rexBuilder = relBuilder.getRexBuilder + +val operator: SqlAggFunction = agg.toSqlAggFunction() + +val aggReturnType: TypeInformation[_] = agg.resultType + +val relDataType = SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType) + +val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]() +val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name + +aggExprs.add(relBuilder.field(aggChildName)) + +val orderKeys: ImmutableList.Builder[RexFieldCollation] = + new ImmutableList.Builder[RexFieldCollation]() + +val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]() +val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name + +val rexNode = + if (orderName.equalsIgnoreCase("rowtime") || orderName.equalsIgnoreCase("proctime")) { +// for stream +relBuilder.literal(orderName) --- End diff -- If we do this, we could remove the special literal handling in `OverAggregate` and `DataStreamOverAggregate`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API
[ https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977684#comment-15977684 ] ASF GitHub Bot commented on FLINK-6228: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112570917 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala --- @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream.table + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.scala.stream.table.OverWindowITCase.{RowTimeSourceFunction} +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamingWithStateTestBase} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.mutable + +class OverWindowITCase extends StreamingWithStateTestBase { + + @Test + def testProcTimeUnBoundedPartitionedRowOver(): Unit = { + +val data = List( + (1L, 1, "Hello"), + (2L, 2, "Hello"), + (3L, 3, "Hello"), + (4L, 4, "Hello"), + (5L, 5, "Hello"), + (6L, 6, "Hello"), + (7L, 7, "Hello World"), + (8L, 8, "Hello World"), + (20L, 20, "Hello World")) + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setParallelism(1) +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.testResults = mutable.MutableList() +StreamITCase.clear +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'a, 'b, 'c) + +val windowedTable = table + .window( +Over partitionBy 'c orderBy 'procTime preceding UNBOUNDED_ROW following CURRENT_ROW as 'w) + .select('c, 'b.count over 'w as 'mycount) + .select('c, 'mycount) + +val results = windowedTable.toDataStream[Row] +results.addSink(new StreamITCase.StringSink) +env.execute() + +val expected = Seq( + "Hello World,1", "Hello World,2", "Hello World,3", + "Hello,1", "Hello,2", "Hello,3", "Hello,4", "Hello,5", "Hello,6") +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testRowTimeUnBoundedPartitionedRangeOver(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +env.setStateBackend(getStateBackend) +StreamITCase.testResults = mutable.MutableList() +StreamITCase.clear +env.setParallelism(1) + +val data = Seq( + Left(1405L, (1, 1L, "Hi")), + Left(1400L, (2, 1L, "Hello")), + Left(1402L, (1, 1L, "Hello")), + Left(1402L, (1, 2L, "Hello")), + Left(1402L, (1, 3L, "Hello world")), + Left(1403L, (2, 2L, "Hello world")), + Left(1403L, (2, 3L, "Hello world")), + Right(1420L), + Left(1421L, (1, 4L, "Hello world")), + Left(1422L, (1, 5L, "Hello world")), + Left(1422L, (1, 6L, "Hello world")), + Left(1422L, (1, 7L, "Hello world")), + Left(1423L, (2, 4L, "Hello world")), + Left(1423L, (2, 5L, "Hello world")), + Right(1430L) +
[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112469838 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala --- @@ -35,6 +37,18 @@ abstract sealed class Aggregation extends UnaryExpression { * Convert Aggregate to its counterpart in Calcite, i.e. AggCall */ private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall + + /** +* Because SqlAggFunction from Calcite's AggCallImpl is invisible, +* we have to manually create sqlAggFunction in flink code base. +* +*/ + private[flink] def toSqlAggFunction()(implicit relBuilder: RelBuilder): SqlAggFunction + + /** +* Attach the Resolved Child to aggregation +*/ + private[flink] def withResolvedChild(child: Expression): Aggregation --- End diff -- remove the `withResolvedChild()` method as we validate the aggregation argument with the default validation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112571486 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala --- @@ -22,6 +22,126 @@ import org.apache.flink.table.expressions.{Expression, ExpressionParser} import org.apache.flink.table.plan.logical._ /** + * An over window specification. + * + * Over window is similar to the traditional OVER SQL. + */ +class OverWindow { + + private[flink] var alias: Expression = _ + private[flink] var partitionBy: Seq[Expression] = Seq[Expression]() + private[flink] var orderBy: Expression = _ + private[flink] var preceding: Expression = _ + private[flink] var following: Expression = null + + /** +* Assigns an alias for this window that the following `select()` clause can refer to. +* +* @param alias alias for this over window +* @return this over window +*/ + def as(alias: String): OverWindow = as(ExpressionParser.parseExpression(alias)) + + /** +* Assigns an alias for this window that the following `select()` clause can refer to. +* +* @param alias alias for this over window +* @return this over window +*/ + def as(alias: Expression): OverWindow = { +this.alias = alias +this + } + + /** +* Partitions the elements on some partition keys. +* +* @param partitionBy +* @return this over window +*/ + def partitionBy(partitionBy: String): OverWindow = { +this.partitionBy(ExpressionParser.parseExpression(partitionBy)) + } + + /** +* Partitions the elements on some partition keys. +* +* @param partitionBy +* @return this over window +*/ + def partitionBy(partitionBy: Expression*): OverWindow = { +this.partitionBy = partitionBy +this + } + + + /** +* Specifies the time mode. +* +* @param orderBy For streaming tables call [[orderBy 'rowtime or orderBy 'proctime]] +*to specify time mode. +* @return this over window +*/ + def orderBy(orderBy: String): OverWindow = { +this.orderBy(ExpressionParser.parseExpression(orderBy)) + } + + /** +* Specifies the time mode. +* +* @param orderBy For streaming tables call [[orderBy 'rowtime or orderBy 'proctime]] +*to specify time mode. +* @return this over window +*/ + def orderBy(orderBy: Expression): OverWindow = { +this.orderBy = orderBy +this + } + + /** +* Set the preceding offset (based on time or row-count intervals) for over window +* +* @param preceding forward offset that relative to the current row +* @return this over window +*/ + def preceding(preceding: String): OverWindow = { +this.preceding(ExpressionParser.parseExpression(preceding)) + } + + /** +* Set the preceding offset (based on time or row-count intervals) for over window +* +* @param preceding forward offset that relative to the current row +* @return this over window +*/ + def preceding(preceding: Expression): OverWindow = { +this.preceding = preceding +this + } + + /** +* Set the following offset (based on time or row-count intervals) for over window +* +* @param following subsequent offset that relative to the current row +* @return this over window +*/ + def following(following: String): OverWindow = { --- End diff -- Should we make `following` optional and default to CURRENT_ROW / CURRENT_RANGE depending on the type of `preceding`? I think that would be a nice shortcut and also be aligned with SQL. What do you think @sunjincheng121 ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112494372 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala --- @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param aggAliasagg alias for following `select()` clause. + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( +agg: Aggregation, +var aggAlias: Expression, --- End diff -- The `aggAlias` should be removed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API
[ https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977676#comment-15977676 ] ASF GitHub Bot commented on FLINK-6228: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112499806 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -93,28 +96,43 @@ class DataStreamOverAggregate( val orderKeys = overWindow.orderKeys.getFieldCollations -if (orderKeys.size() != 1) { - throw new TableException( -"Unsupported use of OVER windows. The window can only be ordered by a single time column.") -} -val orderKey = orderKeys.get(0) +val timeType = if (!orderKeys.isEmpty) { + if (orderKeys.size() != 1) { +throw new TableException( + "Unsupported use of OVER windows. The window can only be ordered by a single time " + +"column.") + } + val orderKey = orderKeys.get(0) -if (!orderKey.direction.equals(ASCENDING)) { - throw new TableException( -"Unsupported use of OVER windows. The window can only be ordered in ASCENDING mode.") + if (!orderKey.direction.equals(ASCENDING)) { +throw new TableException( + "Unsupported use of OVER windows. The window can only be ordered in ASCENDING mode.") + } + inputType +.getFieldList +.get(orderKey.getFieldIndex) +.getValue.asInstanceOf[TimeModeType] +} else { + val it = logicWindow.constants.listIterator() + if (it.hasNext) { +val item = it.next().getValue +if (item.isInstanceOf[NlsString]) { + val value = item.asInstanceOf[NlsString].getValue + if (value.equalsIgnoreCase("rowtime")) { +new RowTimeType + } else { +new ProcTimeType + } +} + } } val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) val generator = new CodeGenerator( - tableEnv.getConfig, - false, - inputDS.getType) - -val timeType = inputType - .getFieldList - .get(orderKey.getFieldIndex) - .getValue +tableEnv.getConfig, --- End diff -- indent > Integrating the OVER windows in the Table API > - > > Key: FLINK-6228 > URL: https://issues.apache.org/jira/browse/FLINK-6228 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Syntax: > {code} > table >.overWindows( > (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy > order_by_expression] > (preceding > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) > [following > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] > as alias,...[n]) >) > .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n]) > {code} > Implement restrictions: > * All OVER clauses in the same SELECT clause must be exactly the same. > * The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > * The ORDER BY Before the > [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884] implementation > orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for > batch). > * FOLLOWING is not supported. > User interface design document [See | > https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API
[ https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977683#comment-15977683 ] ASF GitHub Bot commented on FLINK-6228: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112560976 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala --- @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param aggAliasagg alias for following `select()` clause. + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( +agg: Aggregation, +var aggAlias: Expression, +overWindowAlias: Expression, +var overWindow: OverWindow = null) extends Expression { + + private[flink] def as(aggAlias: Expression): OverCall = { +this.aggAlias = aggAlias +this + } + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + +val rexBuilder = relBuilder.getRexBuilder + +val operator: SqlAggFunction = agg.toSqlAggFunction() + +val aggReturnType: TypeInformation[_] = agg.resultType + +val relDataType = SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType) + +val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]() +val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name + +aggExprs.add(relBuilder.field(aggChildName)) + +val orderKeys: ImmutableList.Builder[RexFieldCollation] = + new ImmutableList.Builder[RexFieldCollation]() + +val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]() +val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name + +val rexNode = + if (orderName.equalsIgnoreCase("rowtime") || orderName.equalsIgnoreCase("proctime")) { +// for stream +relBuilder.literal(orderName) --- End diff -- If we do this, we could remove the special literal handling in `OverAggregate` and `DataStreamOverAggregate`. > Integrating the OVER windows in the Table API > - > > Key: FLINK-6228 > URL: https://issues.apache.org/jira/browse/FLINK-6228 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Syntax: > {code} > table >.overWindows( > (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy > order_by_expression] > (preceding > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) > [following > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] > as alias,...[n]) >) > .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n]) > {code} > Implement restrictions: > * All OVER clauses in the same SELECT clause must be exactly the same. > * The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > * The ORDER BY Before the > [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884] implementation > orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for > batch). > * FOLLOWING is not supported. > User interface design document [See | > https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API
[ https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977668#comment-15977668 ] ASF GitHub Bot commented on FLINK-6228: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112495160 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala --- @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param aggAliasagg alias for following `select()` clause. + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( +agg: Aggregation, +var aggAlias: Expression, +overWindowAlias: Expression, +var overWindow: OverWindow = null) extends Expression { + + private[flink] def as(aggAlias: Expression): OverCall = { --- End diff -- The proper translation can be done in `ProjectTranslator.translateOverWindow()` > Integrating the OVER windows in the Table API > - > > Key: FLINK-6228 > URL: https://issues.apache.org/jira/browse/FLINK-6228 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Syntax: > {code} > table >.overWindows( > (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy > order_by_expression] > (preceding > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) > [following > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] > as alias,...[n]) >) > .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n]) > {code} > Implement restrictions: > * All OVER clauses in the same SELECT clause must be exactly the same. > * The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > * The ORDER BY Before the > [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884] implementation > orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for > batch). > * FOLLOWING is not supported. > User interface design document [See | > https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API
[ https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977652#comment-15977652 ] ASF GitHub Bot commented on FLINK-6228: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112460318 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala --- @@ -216,6 +216,26 @@ object ProjectionTranslator { projectList += unresolved } + case OverCall(agg, aggAlias, alias, _) => +val overWindow = overWindows.find(_.alias.equals(alias)) +val aggName = agg.child.asInstanceOf[UnresolvedFieldReference].name +val childrenOutput = parent.output --- End diff -- `childOutput` or `parentOutput`? > Integrating the OVER windows in the Table API > - > > Key: FLINK-6228 > URL: https://issues.apache.org/jira/browse/FLINK-6228 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Syntax: > {code} > table >.overWindows( > (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy > order_by_expression] > (preceding > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) > [following > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] > as alias,...[n]) >) > .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n]) > {code} > Implement restrictions: > * All OVER clauses in the same SELECT clause must be exactly the same. > * The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > * The ORDER BY Before the > [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884] implementation > orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for > batch). > * FOLLOWING is not supported. > User interface design document [See | > https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API
[ https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977673#comment-15977673 ] ASF GitHub Bot commented on FLINK-6228: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112564205 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala --- @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param aggAliasagg alias for following `select()` clause. + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( --- End diff -- We also need to validate other properties of the overWindow: - check that the partitionBy expressions are valid fields in the input. - If a partitionBy expression is not a field, we would need to push the expression into a Project before the Project with the overWindow and reference the new field. This would need to happen in `Table.window()`. I think for now we can have the restriction that only field references are allowed expressions. - validate the `preceding` and `following` are literals @ > Integrating the OVER windows in the Table API > - > > Key: FLINK-6228 > URL: https://issues.apache.org/jira/browse/FLINK-6228 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Syntax: > {code} > table >.overWindows( > (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy > order_by_expression] > (preceding > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) > [following > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] > as alias,...[n]) >) > .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n]) > {code} > Implement restrictions: > * All OVER clauses in the same SELECT clause must be exactly the same. > * The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > * The ORDER BY Before the > [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884] implementation > orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for > batch). > * FOLLOWING is not supported. > User interface design document [See | > https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112459590 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala --- @@ -190,8 +190,8 @@ object ProjectionTranslator { def expandProjectList( --- End diff -- I would not change the `expandProjectList()` method which has a dedicated purpose. Instead, I would add a new method to `ProjectionTranslator`, which just translates the over windows: ``` def translateOverWindows( exprs: Seq[Expression], overWindows: Array[OverWindow]): Seq[Expression] ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API
[ https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977651#comment-15977651 ] ASF GitHub Bot commented on FLINK-6228: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112457606 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala --- @@ -190,8 +190,8 @@ object ProjectionTranslator { def expandProjectList( exprs: Seq[Expression], parent: LogicalNode, - tableEnv: TableEnvironment) -: Seq[Expression] = { + tableEnv: TableEnvironment, + overWindows: OverWindow*): Seq[Expression] = { --- End diff -- Please use `Array` instead of varargs. > Integrating the OVER windows in the Table API > - > > Key: FLINK-6228 > URL: https://issues.apache.org/jira/browse/FLINK-6228 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Syntax: > {code} > table >.overWindows( > (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy > order_by_expression] > (preceding > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) > [following > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] > as alias,...[n]) >) > .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n]) > {code} > Implement restrictions: > * All OVER clauses in the same SELECT clause must be exactly the same. > * The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > * The ORDER BY Before the > [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884] implementation > orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for > batch). > * FOLLOWING is not supported. > User interface design document [See | > https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112482490 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala --- @@ -22,6 +22,126 @@ import org.apache.flink.table.expressions.{Expression, ExpressionParser} import org.apache.flink.table.plan.logical._ /** + * An over window specification. + * + * Over window is similar to the traditional OVER SQL. + */ +class OverWindow { + + private[flink] var alias: Expression = _ + private[flink] var partitionBy: Seq[Expression] = Seq[Expression]() + private[flink] var orderBy: Expression = _ + private[flink] var preceding: Expression = _ + private[flink] var following: Expression = null + + /** +* Assigns an alias for this window that the following `select()` clause can refer to. +* +* @param alias alias for this over window +* @return this over window +*/ + def as(alias: String): OverWindow = as(ExpressionParser.parseExpression(alias)) + + /** +* Assigns an alias for this window that the following `select()` clause can refer to. +* +* @param alias alias for this over window +* @return this over window +*/ + def as(alias: Expression): OverWindow = { --- End diff -- I think it would be good if we could enforce the alias. This could be done by first build a window which is not of type `OverWindow` and letting `as()` return the complete `OverWindow` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API
[ https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977649#comment-15977649 ] ASF GitHub Bot commented on FLINK-6228: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112459590 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala --- @@ -190,8 +190,8 @@ object ProjectionTranslator { def expandProjectList( --- End diff -- I would not change the `expandProjectList()` method which has a dedicated purpose. Instead, I would add a new method to `ProjectionTranslator`, which just translates the over windows: ``` def translateOverWindows( exprs: Seq[Expression], overWindows: Array[OverWindow]): Seq[Expression] ``` > Integrating the OVER windows in the Table API > - > > Key: FLINK-6228 > URL: https://issues.apache.org/jira/browse/FLINK-6228 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Syntax: > {code} > table >.overWindows( > (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy > order_by_expression] > (preceding > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) > [following > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] > as alias,...[n]) >) > .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n]) > {code} > Implement restrictions: > * All OVER clauses in the same SELECT clause must be exactly the same. > * The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > * The ORDER BY Before the > [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884] implementation > orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for > batch). > * FOLLOWING is not supported. > User interface design document [See | > https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112460885 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala --- @@ -216,6 +216,26 @@ object ProjectionTranslator { projectList += unresolved } + case OverCall(agg, aggAlias, alias, _) => +val overWindow = overWindows.find(_.alias.equals(alias)) +val aggName = agg.child.asInstanceOf[UnresolvedFieldReference].name +val childrenOutput = parent.output +val candidates = childrenOutput.filter(_.name.equalsIgnoreCase(aggName)) + +val resolvedFieldReference = if (candidates.length > 1) { --- End diff -- the other expressions are resolved later. We should follow this pattern to the the code base consistent. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112485134 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala --- @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param aggAliasagg alias for following `select()` clause. + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( --- End diff -- This class should override `checkInputs()` and check that the interval configuration is valid (unbounded - current row, preceding - current row), and the checks which are moved from `table.scala`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112559640 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala --- @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param aggAliasagg alias for following `select()` clause. + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( +agg: Aggregation, +var aggAlias: Expression, +overWindowAlias: Expression, +var overWindow: OverWindow = null) extends Expression { + + private[flink] def as(aggAlias: Expression): OverCall = { +this.aggAlias = aggAlias +this + } + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + +val rexBuilder = relBuilder.getRexBuilder + +val operator: SqlAggFunction = agg.toSqlAggFunction() + +val aggReturnType: TypeInformation[_] = agg.resultType + +val relDataType = SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType) + +val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]() +val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name + +aggExprs.add(relBuilder.field(aggChildName)) + +val orderKeys: ImmutableList.Builder[RexFieldCollation] = + new ImmutableList.Builder[RexFieldCollation]() + +val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]() +val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name + +val rexNode = + if (orderName.equalsIgnoreCase("rowtime") || orderName.equalsIgnoreCase("proctime")) { +// for stream +relBuilder.literal(orderName) --- End diff -- can we use `relBuilder.call(EventTimeExtractor)` and `relBuilder.call(ProcTimeExtractor)` here to make the logic identical to SQL? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112479103 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala --- @@ -586,6 +602,13 @@ trait ImplicitExpressionOperations { * to [[ImplicitExpressionOperations]]. */ trait ImplicitExpressionConversions { + + implicit val UNBOUNDED_ROW = toRowInterval(Long.MaxValue) + implicit val UNBOUNDED_RANGE = toMilliInterval(1, Long.MaxValue) + + implicit val CURRENT_ROW = toRowInterval(0L) + implicit val CURRENT_RANGE = toMilliInterval(0L, Long.MaxValue) --- End diff -- isn't a range interval of `0` a valid range to use? I would mean all rows that arrived in the same millisecond as the current row, no? Should we use `-1` as constant for current row instead? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API
[ https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977672#comment-15977672 ] ASF GitHub Bot commented on FLINK-6228: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112559640 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala --- @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param aggAliasagg alias for following `select()` clause. + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( +agg: Aggregation, +var aggAlias: Expression, +overWindowAlias: Expression, +var overWindow: OverWindow = null) extends Expression { + + private[flink] def as(aggAlias: Expression): OverCall = { +this.aggAlias = aggAlias +this + } + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + +val rexBuilder = relBuilder.getRexBuilder + +val operator: SqlAggFunction = agg.toSqlAggFunction() + +val aggReturnType: TypeInformation[_] = agg.resultType + +val relDataType = SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType) + +val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]() +val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name + +aggExprs.add(relBuilder.field(aggChildName)) + +val orderKeys: ImmutableList.Builder[RexFieldCollation] = + new ImmutableList.Builder[RexFieldCollation]() + +val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]() +val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name + +val rexNode = + if (orderName.equalsIgnoreCase("rowtime") || orderName.equalsIgnoreCase("proctime")) { +// for stream +relBuilder.literal(orderName) --- End diff -- can we use `relBuilder.call(EventTimeExtractor)` and `relBuilder.call(ProcTimeExtractor)` here to make the logic identical to SQL? > Integrating the OVER windows in the Table API > - > > Key: FLINK-6228 > URL: https://issues.apache.org/jira/browse/FLINK-6228 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Syntax: > {code} > table >.overWindows( > (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy > order_by_expression] > (preceding > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) > [following > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] > as alias,...[n]) >) > .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n]) > {code} > Implement restrictions: > * All OVER clauses in the same SELECT clause must be exactly the same. > * The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > * The ORDER BY Before the > [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884] implementation > orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for > batch). > * FOLLOWING is not supported. > User interface design document [See | > https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112456485 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala --- @@ -810,6 +810,47 @@ class Table( new WindowedTable(this, window) } + /** +* Groups the records of a table by assigning them to windows defined by a time or row interval. +* +* For streaming tables of infinite size, grouping into windows is required to define finite +* groups on which over-based aggregates can be computed. +* +* Over window for batch tables are currently not supported. +* +* @param overWindows windows that specifies how elements are grouped. +* @return Over windowed table +*/ + def window(overWindows: OverWindow*): OverWindowedTable = { + +if (tableEnv.isInstanceOf[BatchTableEnvironment]) { + throw TableException("Over window for batch tables are currently not supported.") +} else { + overWindows.foreach { overWindow => +val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name +if (!orderName.equalsIgnoreCase("rowtime") + && !orderName.equalsIgnoreCase("proctime")) { + throw ValidationException( +s"OrderBy expression must be ['rowtime] or ['proctime], but got ['${orderName}]") +} + } +} + +if (overWindows.size != 1) { + throw TableException("OverWindow only supported single window at current time.") +} + +overWindows.foreach { overWindow => + if (!overWindow.preceding.asInstanceOf[Literal].resultType.getClass --- End diff -- This check should be done in `OverCall.validateInput()` as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112476982 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala --- @@ -364,6 +365,21 @@ trait ImplicitExpressionOperations { def position(haystack: Expression) = Position(expr, haystack) /** +* For windowing function to config over window +* e.g.: +* table +* .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows following CURRENT_ROW as 'w) +* .select('c, 'a, 'a.count over 'w, 'a.sum over 'w) +*/ + def over(alias: Expression) = { +expr match { + case _: Aggregation => new OverCall(expr.asInstanceOf[Aggregation], null, alias) --- End diff -- rm `new` (case classes should be instantiated with the `apply()` method, i.e., without `new`) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API
[ https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977664#comment-15977664 ] ASF GitHub Bot commented on FLINK-6228: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112555311 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala --- @@ -83,3 +83,49 @@ object Session { */ def withGap(gap: Expression): SessionWindow = new SessionWindow(gap) } + +/** + * Helper object for creating a over window. + */ +object Over { --- End diff -- I think we need an equivalent class for the Java Table API, similar as `org.apache.flink.table.api.java.groupWindows.scala`. > Integrating the OVER windows in the Table API > - > > Key: FLINK-6228 > URL: https://issues.apache.org/jira/browse/FLINK-6228 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Syntax: > {code} > table >.overWindows( > (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy > order_by_expression] > (preceding > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) > [following > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] > as alias,...[n]) >) > .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n]) > {code} > Implement restrictions: > * All OVER clauses in the same SELECT clause must be exactly the same. > * The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > * The ORDER BY Before the > [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884] implementation > orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for > batch). > * FOLLOWING is not supported. > User interface design document [See | > https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API
[ https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977650#comment-15977650 ] ASF GitHub Bot commented on FLINK-6228: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112460885 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala --- @@ -216,6 +216,26 @@ object ProjectionTranslator { projectList += unresolved } + case OverCall(agg, aggAlias, alias, _) => +val overWindow = overWindows.find(_.alias.equals(alias)) +val aggName = agg.child.asInstanceOf[UnresolvedFieldReference].name +val childrenOutput = parent.output +val candidates = childrenOutput.filter(_.name.equalsIgnoreCase(aggName)) + +val resolvedFieldReference = if (candidates.length > 1) { --- End diff -- the other expressions are resolved later. We should follow this pattern to the the code base consistent. > Integrating the OVER windows in the Table API > - > > Key: FLINK-6228 > URL: https://issues.apache.org/jira/browse/FLINK-6228 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Syntax: > {code} > table >.overWindows( > (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy > order_by_expression] > (preceding > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) > [following > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] > as alias,...[n]) >) > .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n]) > {code} > Implement restrictions: > * All OVER clauses in the same SELECT clause must be exactly the same. > * The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > * The ORDER BY Before the > [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884] implementation > orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for > batch). > * FOLLOWING is not supported. > User interface design document [See | > https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112471307 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala --- @@ -216,6 +216,26 @@ object ProjectionTranslator { projectList += unresolved } + case OverCall(agg, aggAlias, alias, _) => +val overWindow = overWindows.find(_.alias.equals(alias)) +val aggName = agg.child.asInstanceOf[UnresolvedFieldReference].name +val childrenOutput = parent.output +val candidates = childrenOutput.filter(_.name.equalsIgnoreCase(aggName)) + +val resolvedFieldReference = if (candidates.length > 1) { --- End diff -- The aggregations are automatically resolved and validated if we add `agg` to the children of `OverCall` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API
[ https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977663#comment-15977663 ] ASF GitHub Bot commented on FLINK-6228: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112494282 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala --- @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param aggAliasagg alias for following `select()` clause. + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( +agg: Aggregation, +var aggAlias: Expression, +overWindowAlias: Expression, +var overWindow: OverWindow = null) extends Expression { + + private[flink] def as(aggAlias: Expression): OverCall = { --- End diff -- `OverCall` should not implement its own `as` method. We should use the regular `Alias` expression for renaming expressions. > Integrating the OVER windows in the Table API > - > > Key: FLINK-6228 > URL: https://issues.apache.org/jira/browse/FLINK-6228 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Syntax: > {code} > table >.overWindows( > (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy > order_by_expression] > (preceding > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) > [following > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] > as alias,...[n]) >) > .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n]) > {code} > Implement restrictions: > * All OVER clauses in the same SELECT clause must be exactly the same. > * The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > * The ORDER BY Before the > [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884] implementation > orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for > batch). > * FOLLOWING is not supported. > User interface design document [See | > https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API
[ https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977682#comment-15977682 ] ASF GitHub Bot commented on FLINK-6228: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112571486 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala --- @@ -22,6 +22,126 @@ import org.apache.flink.table.expressions.{Expression, ExpressionParser} import org.apache.flink.table.plan.logical._ /** + * An over window specification. + * + * Over window is similar to the traditional OVER SQL. + */ +class OverWindow { + + private[flink] var alias: Expression = _ + private[flink] var partitionBy: Seq[Expression] = Seq[Expression]() + private[flink] var orderBy: Expression = _ + private[flink] var preceding: Expression = _ + private[flink] var following: Expression = null + + /** +* Assigns an alias for this window that the following `select()` clause can refer to. +* +* @param alias alias for this over window +* @return this over window +*/ + def as(alias: String): OverWindow = as(ExpressionParser.parseExpression(alias)) + + /** +* Assigns an alias for this window that the following `select()` clause can refer to. +* +* @param alias alias for this over window +* @return this over window +*/ + def as(alias: Expression): OverWindow = { +this.alias = alias +this + } + + /** +* Partitions the elements on some partition keys. +* +* @param partitionBy +* @return this over window +*/ + def partitionBy(partitionBy: String): OverWindow = { +this.partitionBy(ExpressionParser.parseExpression(partitionBy)) + } + + /** +* Partitions the elements on some partition keys. +* +* @param partitionBy +* @return this over window +*/ + def partitionBy(partitionBy: Expression*): OverWindow = { +this.partitionBy = partitionBy +this + } + + + /** +* Specifies the time mode. +* +* @param orderBy For streaming tables call [[orderBy 'rowtime or orderBy 'proctime]] +*to specify time mode. +* @return this over window +*/ + def orderBy(orderBy: String): OverWindow = { +this.orderBy(ExpressionParser.parseExpression(orderBy)) + } + + /** +* Specifies the time mode. +* +* @param orderBy For streaming tables call [[orderBy 'rowtime or orderBy 'proctime]] +*to specify time mode. +* @return this over window +*/ + def orderBy(orderBy: Expression): OverWindow = { +this.orderBy = orderBy +this + } + + /** +* Set the preceding offset (based on time or row-count intervals) for over window +* +* @param preceding forward offset that relative to the current row +* @return this over window +*/ + def preceding(preceding: String): OverWindow = { +this.preceding(ExpressionParser.parseExpression(preceding)) + } + + /** +* Set the preceding offset (based on time or row-count intervals) for over window +* +* @param preceding forward offset that relative to the current row +* @return this over window +*/ + def preceding(preceding: Expression): OverWindow = { +this.preceding = preceding +this + } + + /** +* Set the following offset (based on time or row-count intervals) for over window +* +* @param following subsequent offset that relative to the current row +* @return this over window +*/ + def following(following: String): OverWindow = { --- End diff -- Should we make `following` optional and default to CURRENT_ROW / CURRENT_RANGE depending on the type of `preceding`? I think that would be a nice shortcut and also be aligned with SQL. What do you think @sunjincheng121 ? > Integrating the OVER windows in the Table API > - > > Key: FLINK-6228 > URL: https://issues.apache.org/jira/browse/FLINK-6228 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Syntax: > {code} > table >.overWindows( > (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy > order_by_expression] > (preceding > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) >
[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API
[ https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977670#comment-15977670 ] ASF GitHub Bot commented on FLINK-6228: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112494372 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala --- @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param aggAliasagg alias for following `select()` clause. + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( +agg: Aggregation, +var aggAlias: Expression, --- End diff -- The `aggAlias` should be removed. > Integrating the OVER windows in the Table API > - > > Key: FLINK-6228 > URL: https://issues.apache.org/jira/browse/FLINK-6228 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Syntax: > {code} > table >.overWindows( > (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy > order_by_expression] > (preceding > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) > [following > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] > as alias,...[n]) >) > .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n]) > {code} > Implement restrictions: > * All OVER clauses in the same SELECT clause must be exactly the same. > * The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > * The ORDER BY Before the > [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884] implementation > orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for > batch). > * FOLLOWING is not supported. > User interface design document [See | > https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112460318 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala --- @@ -216,6 +216,26 @@ object ProjectionTranslator { projectList += unresolved } + case OverCall(agg, aggAlias, alias, _) => +val overWindow = overWindows.find(_.alias.equals(alias)) +val aggName = agg.child.asInstanceOf[UnresolvedFieldReference].name +val childrenOutput = parent.output --- End diff -- `childOutput` or `parentOutput`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112494282 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala --- @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param aggAliasagg alias for following `select()` clause. + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( +agg: Aggregation, +var aggAlias: Expression, +overWindowAlias: Expression, +var overWindow: OverWindow = null) extends Expression { + + private[flink] def as(aggAlias: Expression): OverCall = { --- End diff -- `OverCall` should not implement its own `as` method. We should use the regular `Alias` expression for renaming expressions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112567381 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala --- @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param aggAliasagg alias for following `select()` clause. + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( +agg: Aggregation, +var aggAlias: Expression, +overWindowAlias: Expression, +var overWindow: OverWindow = null) extends Expression { + + private[flink] def as(aggAlias: Expression): OverCall = { +this.aggAlias = aggAlias +this + } + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + +val rexBuilder = relBuilder.getRexBuilder + +val operator: SqlAggFunction = agg.toSqlAggFunction() + +val aggReturnType: TypeInformation[_] = agg.resultType + +val relDataType = SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType) + +val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]() +val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name + +aggExprs.add(relBuilder.field(aggChildName)) + +val orderKeys: ImmutableList.Builder[RexFieldCollation] = + new ImmutableList.Builder[RexFieldCollation]() + +val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]() +val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name + +val rexNode = + if (orderName.equalsIgnoreCase("rowtime") || orderName.equalsIgnoreCase("proctime")) { +// for stream +relBuilder.literal(orderName) + } else { +// for batch +relBuilder.field(orderName) + } + +orderKeys.add(new RexFieldCollation(rexNode,sets)) + +val partitionKeys: util.ArrayList[RexNode] = new util.ArrayList[RexNode]() +overWindow.partitionBy.foreach(x=> + partitionKeys.add(relBuilder.field(x.asInstanceOf[UnresolvedFieldReference].name))) + +val preceding = overWindow.preceding.asInstanceOf[Literal] +val following = overWindow.following.asInstanceOf[Literal] + +val isPhysical: Boolean = preceding.resultType.isInstanceOf[RowIntervalTypeInfo] + +val lowerBound = createBound(relBuilder, preceding.value.asInstanceOf[Long], SqlKind.PRECEDING) +val upperBound = createBound(relBuilder, following.value.asInstanceOf[Long], SqlKind.FOLLOWING) + +rexBuilder.makeOver( + relDataType, + operator, + aggExprs, + partitionKeys, + orderKeys.build, + lowerBound, + upperBound, + isPhysical, + true, + false) + } + + private def createBound( +relBuilder: RelBuilder, +precedingValue: Long, --- End diff -- `precedingValue` -> `value` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API
[ https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977656#comment-15977656 ] ASF GitHub Bot commented on FLINK-6228: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112479103 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala --- @@ -586,6 +602,13 @@ trait ImplicitExpressionOperations { * to [[ImplicitExpressionOperations]]. */ trait ImplicitExpressionConversions { + + implicit val UNBOUNDED_ROW = toRowInterval(Long.MaxValue) + implicit val UNBOUNDED_RANGE = toMilliInterval(1, Long.MaxValue) + + implicit val CURRENT_ROW = toRowInterval(0L) + implicit val CURRENT_RANGE = toMilliInterval(0L, Long.MaxValue) --- End diff -- isn't a range interval of `0` a valid range to use? I would mean all rows that arrived in the same millisecond as the current row, no? Should we use `-1` as constant for current row instead? > Integrating the OVER windows in the Table API > - > > Key: FLINK-6228 > URL: https://issues.apache.org/jira/browse/FLINK-6228 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Syntax: > {code} > table >.overWindows( > (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy > order_by_expression] > (preceding > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) > [following > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] > as alias,...[n]) >) > .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n]) > {code} > Implement restrictions: > * All OVER clauses in the same SELECT clause must be exactly the same. > * The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > * The ORDER BY Before the > [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884] implementation > orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for > batch). > * FOLLOWING is not supported. > User interface design document [See | > https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112570409 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala --- @@ -23,7 +23,9 @@ import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFieldImpl} import org.apache.calcite.rel.core.AggregateCall import org.apache.calcite.rel.core.Window.Group import org.apache.calcite.rel.core.Window -import org.apache.calcite.rex.{RexInputRef} +import org.apache.calcite.rex.{RexInputRef, RexLiteral} --- End diff -- All these changes could be undone if we don't use a literal for the orderBy() expression --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6228) Integrating the OVER windows in the Table API
[ https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977666#comment-15977666 ] ASF GitHub Bot commented on FLINK-6228: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112469357 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala --- @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param aggAliasagg alias for following `select()` clause. + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( +agg: Aggregation, +var aggAlias: Expression, +overWindowAlias: Expression, +var overWindow: OverWindow = null) extends Expression { + + private[flink] def as(aggAlias: Expression): OverCall = { +this.aggAlias = aggAlias +this + } + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + +val rexBuilder = relBuilder.getRexBuilder + +val operator: SqlAggFunction = agg.toSqlAggFunction() + +val aggReturnType: TypeInformation[_] = agg.resultType + +val relDataType = SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType) + +val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]() +val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name + +aggExprs.add(relBuilder.field(aggChildName)) + +val orderKeys: ImmutableList.Builder[RexFieldCollation] = + new ImmutableList.Builder[RexFieldCollation]() + +val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]() +val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name + +val rexNode = + if (orderName.equalsIgnoreCase("rowtime") || orderName.equalsIgnoreCase("proctime")) { +// for stream +relBuilder.literal(orderName) + } else { +// for batch +relBuilder.field(orderName) + } + +orderKeys.add(new RexFieldCollation(rexNode,sets)) + +val partitionKeys: util.ArrayList[RexNode] = new util.ArrayList[RexNode]() +overWindow.partitionBy.foreach(x=> + partitionKeys.add(relBuilder.field(x.asInstanceOf[UnresolvedFieldReference].name))) + +val preceding = overWindow.preceding.asInstanceOf[Literal] +val following = overWindow.following.asInstanceOf[Literal] + +val isPhysical: Boolean = preceding.resultType.isInstanceOf[RowIntervalTypeInfo] + +val lowerBound = createBound(relBuilder, preceding.value.asInstanceOf[Long], SqlKind.PRECEDING) +val upperBound = createBound(relBuilder, following.value.asInstanceOf[Long], SqlKind.FOLLOWING) + +rexBuilder.makeOver( + relDataType, + operator, + aggExprs, + partitionKeys, + orderKeys.build, + lowerBound, + upperBound, + isPhysical, + true, + false) + } + + private def createBound( +relBuilder: RelBuilder, +precedingValue: Long, +sqlKind: SqlKind): RexWindowBound = { + +if (precedingValue == Long.MaxValue) { + val unbounded = SqlWindow.createUnboundedPreceding(SqlParserPos.ZERO) + create(unbounded, null) +} else if (precedingValue == 0L) { + val currentRow = SqlWindow.createCurrentRow(SqlParserPos.ZERO) + create(currentRow, null) +} else { + + val returnType = new BasicSqlType( +relBuilder.getTypeFactory.getTypeSystem, +SqlTypeName.DECIMAL) + + val sqlOperator = new SqlPostfixOperator( +sqlKind.name, +sqlKind, +2, +new OrdinalReturnTypeInference(0), +null, +null) + + val operands: Array[SqlNode] = new Array[SqlNode](1) + operands(0) = (SqlLiteral.createExactNumeric("1", SqlParserPos.ZERO)) + + val node = new SqlBasicCall(sqlOperator, operands, SqlParserPos.ZERO) + + val expressions: util.ArrayList[RexNode] = new util.ArrayList[RexNode]() + expressions.add(relBuilder.literal(precedingValue)) + + val rexNode = relBuilder.getRexBuilder.makeCall(returnType, sqlOperator, expressions) + + create(node, rexNode) +} + } + + override private[flink] def children: Seq[Expression] = Seq() --- End diff -- change to `Seq(agg)` to automatically
[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112570917 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala --- @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream.table + +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.scala.stream.table.OverWindowITCase.{RowTimeSourceFunction} +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamingWithStateTestBase} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.mutable + +class OverWindowITCase extends StreamingWithStateTestBase { + + @Test + def testProcTimeUnBoundedPartitionedRowOver(): Unit = { + +val data = List( + (1L, 1, "Hello"), + (2L, 2, "Hello"), + (3L, 3, "Hello"), + (4L, 4, "Hello"), + (5L, 5, "Hello"), + (6L, 6, "Hello"), + (7L, 7, "Hello World"), + (8L, 8, "Hello World"), + (20L, 20, "Hello World")) + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setParallelism(1) +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.testResults = mutable.MutableList() +StreamITCase.clear +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'a, 'b, 'c) + +val windowedTable = table + .window( +Over partitionBy 'c orderBy 'procTime preceding UNBOUNDED_ROW following CURRENT_ROW as 'w) + .select('c, 'b.count over 'w as 'mycount) + .select('c, 'mycount) + +val results = windowedTable.toDataStream[Row] +results.addSink(new StreamITCase.StringSink) +env.execute() + +val expected = Seq( + "Hello World,1", "Hello World,2", "Hello World,3", + "Hello,1", "Hello,2", "Hello,3", "Hello,4", "Hello,5", "Hello,6") +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testRowTimeUnBoundedPartitionedRangeOver(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +env.setStateBackend(getStateBackend) +StreamITCase.testResults = mutable.MutableList() +StreamITCase.clear +env.setParallelism(1) + +val data = Seq( + Left(1405L, (1, 1L, "Hi")), + Left(1400L, (2, 1L, "Hello")), + Left(1402L, (1, 1L, "Hello")), + Left(1402L, (1, 2L, "Hello")), + Left(1402L, (1, 3L, "Hello world")), + Left(1403L, (2, 2L, "Hello world")), + Left(1403L, (2, 3L, "Hello world")), + Right(1420L), + Left(1421L, (1, 4L, "Hello world")), + Left(1422L, (1, 5L, "Hello world")), + Left(1422L, (1, 6L, "Hello world")), + Left(1422L, (1, 7L, "Hello world")), + Left(1423L, (2, 4L, "Hello world")), + Left(1423L, (2, 5L, "Hello world")), + Right(1430L) +) +val table = env + .addSource(new RowTimeSourceFunction[(Int, Long, String)](data)) + .toTable(tEnv).as('a, 'b, 'c) + +val windowedTable = table + .window(Over partitionBy 'a orderBy
[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112499806 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -93,28 +96,43 @@ class DataStreamOverAggregate( val orderKeys = overWindow.orderKeys.getFieldCollations -if (orderKeys.size() != 1) { - throw new TableException( -"Unsupported use of OVER windows. The window can only be ordered by a single time column.") -} -val orderKey = orderKeys.get(0) +val timeType = if (!orderKeys.isEmpty) { + if (orderKeys.size() != 1) { +throw new TableException( + "Unsupported use of OVER windows. The window can only be ordered by a single time " + +"column.") + } + val orderKey = orderKeys.get(0) -if (!orderKey.direction.equals(ASCENDING)) { - throw new TableException( -"Unsupported use of OVER windows. The window can only be ordered in ASCENDING mode.") + if (!orderKey.direction.equals(ASCENDING)) { +throw new TableException( + "Unsupported use of OVER windows. The window can only be ordered in ASCENDING mode.") + } + inputType +.getFieldList +.get(orderKey.getFieldIndex) +.getValue.asInstanceOf[TimeModeType] +} else { + val it = logicWindow.constants.listIterator() + if (it.hasNext) { +val item = it.next().getValue +if (item.isInstanceOf[NlsString]) { + val value = item.asInstanceOf[NlsString].getValue + if (value.equalsIgnoreCase("rowtime")) { +new RowTimeType + } else { +new ProcTimeType + } +} + } } val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) val generator = new CodeGenerator( - tableEnv.getConfig, - false, - inputDS.getType) - -val timeType = inputType - .getFieldList - .get(orderKey.getFieldIndex) - .getValue +tableEnv.getConfig, --- End diff -- indent --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112555311 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala --- @@ -83,3 +83,49 @@ object Session { */ def withGap(gap: Expression): SessionWindow = new SessionWindow(gap) } + +/** + * Helper object for creating a over window. + */ +object Over { --- End diff -- I think we need an equivalent class for the Java Table API, similar as `org.apache.flink.table.api.java.groupWindows.scala`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112570230 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -93,28 +96,43 @@ class DataStreamOverAggregate( val orderKeys = overWindow.orderKeys.getFieldCollations -if (orderKeys.size() != 1) { - throw new TableException( -"Unsupported use of OVER windows. The window can only be ordered by a single time column.") -} -val orderKey = orderKeys.get(0) +val timeType = if (!orderKeys.isEmpty) { + if (orderKeys.size() != 1) { +throw new TableException( + "Unsupported use of OVER windows. The window can only be ordered by a single time " + +"column.") + } + val orderKey = orderKeys.get(0) -if (!orderKey.direction.equals(ASCENDING)) { - throw new TableException( -"Unsupported use of OVER windows. The window can only be ordered in ASCENDING mode.") + if (!orderKey.direction.equals(ASCENDING)) { +throw new TableException( + "Unsupported use of OVER windows. The window can only be ordered in ASCENDING mode.") + } + inputType +.getFieldList +.get(orderKey.getFieldIndex) +.getValue.asInstanceOf[TimeModeType] +} else { --- End diff -- this could be removed if we don't use literals for the orderBy() field --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r11245 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala --- @@ -810,6 +810,47 @@ class Table( new WindowedTable(this, window) } + /** --- End diff -- Extend docs to ``` /** * Defines over-windows on the records of a table. * * An over-window defines for each record an interval of records over which aggregation * functions can be computed. * * Example: * * {{{ * table * .window(Over partitionBy 'c orderBy 'rowTime preceding 10.seconds as 'ow) * .select('c, 'b.count over 'ow, 'e.sum over 'ow) * }}} * * __Note__: Computing over window aggregates on a streaming table is only a parallel operation * if the window is partititioned. Otherwise, the whole stream will be processed by a single * task, i.e., with parallelism 1. * * __Note__: Over-windows for batch tables are currently not supported. * * @param overWindows windows that specify the record interval over which aggregations are *computed. * @return An OverWindowedTable to specify the aggregations. */ ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112473915 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala --- @@ -810,6 +810,47 @@ class Table( new WindowedTable(this, window) } + /** +* Groups the records of a table by assigning them to windows defined by a time or row interval. +* +* For streaming tables of infinite size, grouping into windows is required to define finite +* groups on which over-based aggregates can be computed. +* +* Over window for batch tables are currently not supported. +* +* @param overWindows windows that specifies how elements are grouped. +* @return Over windowed table +*/ + def window(overWindows: OverWindow*): OverWindowedTable = { + +if (tableEnv.isInstanceOf[BatchTableEnvironment]) { + throw TableException("Over window for batch tables are currently not supported.") +} else { + overWindows.foreach { overWindow => +val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name --- End diff -- This check should be done in `OverCall.validateInput()` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112557404 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala --- @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param aggAliasagg alias for following `select()` clause. + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( +agg: Aggregation, +var aggAlias: Expression, +overWindowAlias: Expression, +var overWindow: OverWindow = null) extends Expression { + + private[flink] def as(aggAlias: Expression): OverCall = { +this.aggAlias = aggAlias +this + } + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + +val rexBuilder = relBuilder.getRexBuilder + +val operator: SqlAggFunction = agg.toSqlAggFunction() + +val aggReturnType: TypeInformation[_] = agg.resultType + +val relDataType = SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType) + +val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]() +val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name + +aggExprs.add(relBuilder.field(aggChildName)) + +val orderKeys: ImmutableList.Builder[RexFieldCollation] = + new ImmutableList.Builder[RexFieldCollation]() + +val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]() --- End diff -- +space `val sets: util.HashSet` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112570095 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala --- @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param aggAliasagg alias for following `select()` clause. + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( +agg: Aggregation, +var aggAlias: Expression, +overWindowAlias: Expression, +var overWindow: OverWindow = null) extends Expression { + + private[flink] def as(aggAlias: Expression): OverCall = { +this.aggAlias = aggAlias +this + } + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + +val rexBuilder = relBuilder.getRexBuilder + +val operator: SqlAggFunction = agg.toSqlAggFunction() + +val aggReturnType: TypeInformation[_] = agg.resultType + +val relDataType = SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType) + +val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]() +val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name + +aggExprs.add(relBuilder.field(aggChildName)) + +val orderKeys: ImmutableList.Builder[RexFieldCollation] = + new ImmutableList.Builder[RexFieldCollation]() + +val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]() +val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name + +val rexNode = + if (orderName.equalsIgnoreCase("rowtime") || orderName.equalsIgnoreCase("proctime")) { +// for stream +relBuilder.literal(orderName) + } else { +// for batch +relBuilder.field(orderName) + } + +orderKeys.add(new RexFieldCollation(rexNode,sets)) + +val partitionKeys: util.ArrayList[RexNode] = new util.ArrayList[RexNode]() +overWindow.partitionBy.foreach(x=> + partitionKeys.add(relBuilder.field(x.asInstanceOf[UnresolvedFieldReference].name))) + +val preceding = overWindow.preceding.asInstanceOf[Literal] +val following = overWindow.following.asInstanceOf[Literal] + +val isPhysical: Boolean = preceding.resultType.isInstanceOf[RowIntervalTypeInfo] + +val lowerBound = createBound(relBuilder, preceding.value.asInstanceOf[Long], SqlKind.PRECEDING) +val upperBound = createBound(relBuilder, following.value.asInstanceOf[Long], SqlKind.FOLLOWING) + +rexBuilder.makeOver( + relDataType, + operator, + aggExprs, + partitionKeys, + orderKeys.build, + lowerBound, + upperBound, + isPhysical, + true, + false) + } + + private def createBound( +relBuilder: RelBuilder, +precedingValue: Long, +sqlKind: SqlKind): RexWindowBound = { + +if (precedingValue == Long.MaxValue) { + val unbounded = SqlWindow.createUnboundedPreceding(SqlParserPos.ZERO) + create(unbounded, null) +} else if (precedingValue == 0L) { + val currentRow = SqlWindow.createCurrentRow(SqlParserPos.ZERO) + create(currentRow, null) +} else { + + val returnType = new BasicSqlType( +relBuilder.getTypeFactory.getTypeSystem, +SqlTypeName.DECIMAL) + + val sqlOperator = new SqlPostfixOperator( +sqlKind.name, +sqlKind, +2, +new OrdinalReturnTypeInference(0), +null, +null) + + val operands: Array[SqlNode] = new Array[SqlNode](1) + operands(0) = (SqlLiteral.createExactNumeric("1", SqlParserPos.ZERO)) + + val node = new SqlBasicCall(sqlOperator, operands, SqlParserPos.ZERO) + + val expressions: util.ArrayList[RexNode] = new util.ArrayList[RexNode]() + expressions.add(relBuilder.literal(precedingValue)) + + val rexNode = relBuilder.getRexBuilder.makeCall(returnType, sqlOperator, expressions) + + create(node, rexNode) +} + } + + override private[flink] def children: Seq[Expression] = Seq() --- End diff -- maybe we can check the partitionBy expressions here as well? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not
[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112495783 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala --- @@ -54,6 +54,9 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extend case expr if !expr.valid => u case c @ Cast(ne: NamedExpression, tp) => Alias(c, s"${ne.name}-$tp") case gcf: GetCompositeField => Alias(gcf, gcf.aliasName().getOrElse(s"_c$i")) +case over: OverCall if null != over.aggAlias => --- End diff -- We don't need this if we handle the aggregation alias as regular `Alias` expression --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112457606 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala --- @@ -190,8 +190,8 @@ object ProjectionTranslator { def expandProjectList( exprs: Seq[Expression], parent: LogicalNode, - tableEnv: TableEnvironment) -: Seq[Expression] = { + tableEnv: TableEnvironment, + overWindows: OverWindow*): Seq[Expression] = { --- End diff -- Please use `Array` instead of varargs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112495160 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala --- @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param aggAliasagg alias for following `select()` clause. + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( +agg: Aggregation, +var aggAlias: Expression, +overWindowAlias: Expression, +var overWindow: OverWindow = null) extends Expression { + + private[flink] def as(aggAlias: Expression): OverCall = { --- End diff -- The proper translation can be done in `ProjectTranslator.translateOverWindow()` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112501222 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/SqlTypeUtils.scala --- @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.typeutils + +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.Types._ +import org.apache.flink.table.api.TableException + +object SqlTypeUtils { --- End diff -- Util class can be removed. We can create a `RelDataType` from a `TypeInformation` as follows: ``` relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory].createTypeFromTypeInfo(typeInfo) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112469357 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala --- @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param aggAliasagg alias for following `select()` clause. + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( +agg: Aggregation, +var aggAlias: Expression, +overWindowAlias: Expression, +var overWindow: OverWindow = null) extends Expression { + + private[flink] def as(aggAlias: Expression): OverCall = { +this.aggAlias = aggAlias +this + } + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + +val rexBuilder = relBuilder.getRexBuilder + +val operator: SqlAggFunction = agg.toSqlAggFunction() + +val aggReturnType: TypeInformation[_] = agg.resultType + +val relDataType = SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType) + +val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]() +val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name + +aggExprs.add(relBuilder.field(aggChildName)) + +val orderKeys: ImmutableList.Builder[RexFieldCollation] = + new ImmutableList.Builder[RexFieldCollation]() + +val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]() +val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name + +val rexNode = + if (orderName.equalsIgnoreCase("rowtime") || orderName.equalsIgnoreCase("proctime")) { +// for stream +relBuilder.literal(orderName) + } else { +// for batch +relBuilder.field(orderName) + } + +orderKeys.add(new RexFieldCollation(rexNode,sets)) + +val partitionKeys: util.ArrayList[RexNode] = new util.ArrayList[RexNode]() +overWindow.partitionBy.foreach(x=> + partitionKeys.add(relBuilder.field(x.asInstanceOf[UnresolvedFieldReference].name))) + +val preceding = overWindow.preceding.asInstanceOf[Literal] +val following = overWindow.following.asInstanceOf[Literal] + +val isPhysical: Boolean = preceding.resultType.isInstanceOf[RowIntervalTypeInfo] + +val lowerBound = createBound(relBuilder, preceding.value.asInstanceOf[Long], SqlKind.PRECEDING) +val upperBound = createBound(relBuilder, following.value.asInstanceOf[Long], SqlKind.FOLLOWING) + +rexBuilder.makeOver( + relDataType, + operator, + aggExprs, + partitionKeys, + orderKeys.build, + lowerBound, + upperBound, + isPhysical, + true, + false) + } + + private def createBound( +relBuilder: RelBuilder, +precedingValue: Long, +sqlKind: SqlKind): RexWindowBound = { + +if (precedingValue == Long.MaxValue) { + val unbounded = SqlWindow.createUnboundedPreceding(SqlParserPos.ZERO) + create(unbounded, null) +} else if (precedingValue == 0L) { + val currentRow = SqlWindow.createCurrentRow(SqlParserPos.ZERO) + create(currentRow, null) +} else { + + val returnType = new BasicSqlType( +relBuilder.getTypeFactory.getTypeSystem, +SqlTypeName.DECIMAL) + + val sqlOperator = new SqlPostfixOperator( +sqlKind.name, +sqlKind, +2, +new OrdinalReturnTypeInference(0), +null, +null) + + val operands: Array[SqlNode] = new Array[SqlNode](1) + operands(0) = (SqlLiteral.createExactNumeric("1", SqlParserPos.ZERO)) + + val node = new SqlBasicCall(sqlOperator, operands, SqlParserPos.ZERO) + + val expressions: util.ArrayList[RexNode] = new util.ArrayList[RexNode]() + expressions.add(relBuilder.literal(precedingValue)) + + val rexNode = relBuilder.getRexBuilder.makeCall(returnType, sqlOperator, expressions) + + create(node, rexNode) +} + } + + override private[flink] def children: Seq[Expression] = Seq() --- End diff -- change to `Seq(agg)` to automatically validate the aggregation call and its arguments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the
[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112488004 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala --- @@ -35,6 +37,18 @@ abstract sealed class Aggregation extends UnaryExpression { * Convert Aggregate to its counterpart in Calcite, i.e. AggCall */ private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall + + /** +* Because SqlAggFunction from Calcite's AggCallImpl is invisible, --- End diff -- Thanks for the explanation for this method. Can you update the doc to `Returns the SqlAggFunction for this Aggregation` and change the method name to `getSqlAggFunction()`? The method does not convert the Aggregation because the returned SqlAggFunction does not contain the arguments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112564205 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala --- @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param aggAliasagg alias for following `select()` clause. + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( --- End diff -- We also need to validate other properties of the overWindow: - check that the partitionBy expressions are valid fields in the input. - If a partitionBy expression is not a field, we would need to push the expression into a Project before the Project with the overWindow and reference the new field. This would need to happen in `Table.window()`. I think for now we can have the restriction that only field references are allowed expressions. - validate the `preceding` and `following` are literals @ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112561580 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala --- @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param aggAliasagg alias for following `select()` clause. + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( +agg: Aggregation, +var aggAlias: Expression, +overWindowAlias: Expression, +var overWindow: OverWindow = null) extends Expression { + + private[flink] def as(aggAlias: Expression): OverCall = { +this.aggAlias = aggAlias +this + } + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + +val rexBuilder = relBuilder.getRexBuilder + +val operator: SqlAggFunction = agg.toSqlAggFunction() + +val aggReturnType: TypeInformation[_] = agg.resultType + +val relDataType = SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType) + +val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]() +val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name + +aggExprs.add(relBuilder.field(aggChildName)) + +val orderKeys: ImmutableList.Builder[RexFieldCollation] = + new ImmutableList.Builder[RexFieldCollation]() + +val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]() +val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name + +val rexNode = + if (orderName.equalsIgnoreCase("rowtime") || orderName.equalsIgnoreCase("proctime")) { +// for stream +relBuilder.literal(orderName) + } else { +// for batch +relBuilder.field(orderName) + } + +orderKeys.add(new RexFieldCollation(rexNode,sets)) + +val partitionKeys: util.ArrayList[RexNode] = new util.ArrayList[RexNode]() +overWindow.partitionBy.foreach(x=> --- End diff -- +spacea `foreach( x =>` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3743#discussion_r112567644 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala --- @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression } /** + * Over expression for calcite over transform. + * + * @param agg over-agg expression + * @param aggAliasagg alias for following `select()` clause. + * @param overWindowAlias over window alias + * @param overWindow over window + */ +case class OverCall( +agg: Aggregation, +var aggAlias: Expression, +overWindowAlias: Expression, +var overWindow: OverWindow = null) extends Expression { + + private[flink] def as(aggAlias: Expression): OverCall = { +this.aggAlias = aggAlias +this + } + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + +val rexBuilder = relBuilder.getRexBuilder + +val operator: SqlAggFunction = agg.toSqlAggFunction() + +val aggReturnType: TypeInformation[_] = agg.resultType + +val relDataType = SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType) + +val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]() +val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name + +aggExprs.add(relBuilder.field(aggChildName)) + +val orderKeys: ImmutableList.Builder[RexFieldCollation] = + new ImmutableList.Builder[RexFieldCollation]() + +val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]() +val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name + +val rexNode = + if (orderName.equalsIgnoreCase("rowtime") || orderName.equalsIgnoreCase("proctime")) { +// for stream +relBuilder.literal(orderName) + } else { +// for batch +relBuilder.field(orderName) + } + +orderKeys.add(new RexFieldCollation(rexNode,sets)) + +val partitionKeys: util.ArrayList[RexNode] = new util.ArrayList[RexNode]() +overWindow.partitionBy.foreach(x=> + partitionKeys.add(relBuilder.field(x.asInstanceOf[UnresolvedFieldReference].name))) + +val preceding = overWindow.preceding.asInstanceOf[Literal] +val following = overWindow.following.asInstanceOf[Literal] + +val isPhysical: Boolean = preceding.resultType.isInstanceOf[RowIntervalTypeInfo] + +val lowerBound = createBound(relBuilder, preceding.value.asInstanceOf[Long], SqlKind.PRECEDING) +val upperBound = createBound(relBuilder, following.value.asInstanceOf[Long], SqlKind.FOLLOWING) + +rexBuilder.makeOver( + relDataType, + operator, + aggExprs, + partitionKeys, + orderKeys.build, + lowerBound, + upperBound, + isPhysical, + true, + false) + } + + private def createBound( +relBuilder: RelBuilder, +precedingValue: Long, +sqlKind: SqlKind): RexWindowBound = { + +if (precedingValue == Long.MaxValue) { --- End diff -- Please use the constants defined in `expressionDsl.scala` for the checks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---