[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16147947#comment-16147947 ] ASF GitHub Bot commented on FLINK-5654: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3641 [KeyedProcessOperator](https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java) would be one of those. > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > Fix For: 1.3.0 > > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16146895#comment-16146895 ] ASF GitHub Bot commented on FLINK-5654: --- Github user narendraj9 commented on the issue: https://github.com/apache/flink/pull/3641 @fhueske Thanks for the link to documentation. I have gone through the document. Can you also share a link to the class that would manage a RichProcessFunction instance and make sure that onTimer() and onProcessElement() calls are not interleaved. > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > Fix For: 1.3.0 > > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16146834#comment-16146834 ] ASF GitHub Bot commented on FLINK-5654: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3641 This might answer some questions: https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/process_function.html > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > Fix For: 1.3.0 > > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1614#comment-1614 ] ASF GitHub Bot commented on FLINK-5654: --- Github user narendraj9 commented on the issue: https://github.com/apache/flink/pull/3641 @fhueske Hi, I am trying to understand how Objects of RichProcessFunction class are managed. For example, you mentioned that onTimer() and processElement() won't be called at the same time. Could you share the document/link or code that I can read? > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > Fix For: 1.3.0 > > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972223#comment-15972223 ] ASF GitHub Bot commented on FLINK-5654: --- Github user huawei-flink closed the pull request at: https://github.com/apache/flink/pull/3459 > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > Fix For: 1.3.0 > > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15971520#comment-15971520 ] ASF GitHub Bot commented on FLINK-5654: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3459 Hi @huawei-flink, the feature was contributed in a follow up PR. Could you close this PR? Thanks! > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > Fix For: 1.3.0 > > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15951563#comment-15951563 ] ASF GitHub Bot commented on FLINK-5654: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3590 > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15951562#comment-15951562 ] ASF GitHub Bot commented on FLINK-5654: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3641 > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15951561#comment-15951561 ] ASF GitHub Bot commented on FLINK-5654: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3550 > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15951254#comment-15951254 ] ASF GitHub Bot commented on FLINK-5654: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3641 Thanks for the update @rtudoran. I will clean that up and merge this PR. Thanks, Fabian > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15950841#comment-15950841 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3641 @fhueske for some reason there was some strange conflict in the SQLITCase class. I marked is as read. As this final commit does not contain any tests anymroe in the SQLITCase class, when you integrate this PR you can just ignore the class. > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15950838#comment-15950838 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3641 @fhueske Based on the discussions on the PR this commit includes: -an implementation based on onTimer in which multiple events arriving in the same proctime would get the same accumulator values -a removal of the ITCase tests -a consolidation of the tests done based on the harness framework for proctime > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15950771#comment-15950771 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3641 @fhueske Great - i was just randomly searching for cases to be valid in single unit runs/cluster run/ide run ...which apparently have widely different latencies :)) > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15950761#comment-15950761 ] ASF GitHub Bot commented on FLINK-5654: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3641 I agree, let's drop the ITCases for that. The test harness tests should be sufficient. > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15950743#comment-15950743 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3641 @fhueske the problem now is that when we run the ITCase tests they are quite random due to the different latency that might appear in the processing time. I think this are quite unreliable and we should not rely on them > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15950608#comment-15950608 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3641 @fhueske Thanks for the feedback. It has on the one hand that the few events that arrive with the same proctime are not computed iteratively, but in a batch (all in the same time - 1 ms later), but it avoids all the mess of having strong synchronization in case there is no explicit and fixed order of execution between the 2 functions (as per my example in the mailing list) > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15950562#comment-15950562 ] ASF GitHub Bot commented on FLINK-5654: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3641 you're right. That works as well. :-) > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15950542#comment-15950542 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3641 @fhueske it is good to know, but i think we can optimize the computation as mentioned above > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15950540#comment-15950540 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3641 I found a workaround this issue - i will accumulate events in processElement and compute and emit in onTImer > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15950535#comment-15950535 ] ASF GitHub Bot commented on FLINK-5654: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3641 Hi @rtudoran, the calls to `processElement()` and `onTimer()` and guarded by a lock. So both methods won't be called at that the same time. > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15950408#comment-15950408 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3641 @fhueske I just thought of an example. Assume you have 2 events coming at consecutime processintimes Ev1,1 Ev2, 2 When ev1 arrives you accumulate it and register the timer at proctime +1 to emit it. However when ontimer would be called also the processelement is called. This might lead to some concurrent access for accessing the accumulator as one function needs to read it while the other needs to modify it. Is there a clear order for the execution in such concurrent scenarios? > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15949892#comment-15949892 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3641 for the harness tests - i would let you know after it is implemented...in the case it does not work i would temporary change the tests - that is until a new version of harness would be implemented/fixed > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15949887#comment-15949887 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3641 @fhueske OK - i will implement this and ping you when it is ready. One more question - when you say: "The ProcessFunction does also support to register processing time timers via the context object." i guess you refer to something like using instead of ctx.timerService.registerEventTimeTimer(curWatermark + 1) ctx.timerService.registerProcTimeTimer(curProcTime + 1) > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15949816#comment-15949816 ] ASF GitHub Bot commented on FLINK-5654: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3641 You are right, with processing time no two records arrive at exactly the same time. However, we also retract values based on the granularity of milliseconds. Therefore, I think is makes sense to also do the accumulation on this granularity. The other reason for the change would be consistency with event-time as you mentioned. The `ProcessFunction` does also support to register processing time timers via the context object. With the harness we decouple the processing time from the system time. I would hope that the harness also triggers a timer when we increment the processing time but I'm not 100% sure it does. > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15949795#comment-15949795 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3641 ignore question 1 - i found an example in the UnboundedEventTimeOverProcessFunction.scala > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15949791#comment-15949791 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3641 @fhueske Thanks for the example. First of all i am happy that we agree that we need to emit something for every input :). I was scared that this will not be the case Now regarding the 2 options for the output - i would still have a preference for the first option. The reason for this is that at least in the case of proctime there is no 2 events that are simultaneous in absolute terms. There is an implicit serialization (also based on how flink engine works...one event will arrive before the other). It might simply be a coincidence that the granularity that the computers now cannot measure system time with more fine granularity...and because of that we would have 2 events which are apparently with the same proc time stamp... However, as you mentioned several times (and i agree) having a uniform behavior might be a key point. In this case we would indeed implement the behavior you suggested. - Let me know again if you are sure about this. I raised my concerns but i am open to accept what you suggest. In case you suggest to implement it with proctimes, i have 2 questions: 1) do you know if there is some example for timer in proc time (if not, now problem - i will figure it out) 2) in the case of harness tests - the correct implementation of the behavior might not match the test. If i write a test that does {code} testHarness.setProcessingTime(3) testHarness.processElement(new StreamRecord(rInput, 1001)) // let us assume that the system time is 1490906681 testHarness.processElement(new StreamRecord(rInput, 2002)) // let us assume that the system time is still 1490906681 +testHarness.processElement(new StreamRecord(rInput, 2003)) // let us assume that the system time is now 1490906682 +testHarness.processElement(new StreamRecord(rInput, 2004)) // let us assume that the system time is now 1490906682 {code} in this case if we do a Count, than the output would be ev1 2 ev2 2 ev3 2 ev4 2 ...instead of having all with an associated count of 4 (which would be in the behavior you mention). This would be because i would assume that the timer +1ms would be triggered based on system time. In such a case - should we just build a limited test that would work? > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15949733#comment-15949733 ] ASF GitHub Bot commented on FLINK-5654: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3641 Sorry, I think did not explain the change correctly. I did not mean to group multiple record together. We still emit one row per input row, but given the following input ``` (proc-time, id, a) (1, a, 1) (2, b, 2) (3, c, 3) (3, d, 4) (4, e, 5) ``` and the query ``` SELECT id, sum(a) OVER (ORDER BY proctime RANGE BETWEEN 2 PRECEDING AND CURRENT ROW) FROM x ``` the result would not be ``` (a, 1), (b, 3), (c, 6), (d, 10), (e, 14) ``` and not ``` (a, 1), (b, 3), (c, 10), (d, 10), (e, 14) ``` Because `c` and `d` arrived at the same time, their aggregations need to be the same. This is actually also the implementation for event-time OVER RANGE windows. > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15949721#comment-15949721 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3641 @fhueske - you can check also the definition from the Calcite website https://calcite.apache.org/docs/stream.html Standard SQL features so-called “analytic functions” that can be used in the SELECT clause. Unlike GROUP BY, these do not collapse records. For each record that goes in, one record comes out. But the aggregate function is based on a window of many rows. > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15949717#comment-15949717 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3641 @fhueske Thanks for the feedback - i can of course do the modifications you mentioned. However, I do not believe that is the correct behavior (or better said not for all the needed cases). From my understanding of the semantic for OVER - even if the function would work on the time (proctime/eventtime) - we should still emit a value for every incoming event. I have 2 arguments for this: 1) in our scenarios - we would use the current implementation for example to detect certain statistics for every incoming event, while the statistic focus is defined for a certain period of time (this is a functionality that is highly needed). For example if you apply this in a stock market scenario - you might want to say give me the sum of the transactions over the last hour (to verify potentially a threshold for liquidity of the market) and as the application would need to react on each incoming transaction (e.g. decide to buy or not to buy) - then working on the behavior you mentioned would not enable such a scenario. More than this, even if you would need both behaviors ..then what query could you write to have the described behavior and make the differentiation from the other? 2) if you think on the case of event time - which should be similar with proctime - then there it should be the same. When you get an event (ev1 , time1) - you should not emit this output until you would know if there is at some point later another event with the same event time (ev2, time1). Basically you would register the timer for the acceptable watermark/allowedlatency and accumulate the accumulator for a specific event time and emit it after the allowed latency has passedis this the actual behavior that is implemented / would be implemented? > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15949408#comment-15949408 ] ASF GitHub Bot commented on FLINK-5654: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3641 Oh, I just remembered that the OVER RANGE semantics are slightly different. All rows that arrive in the same millisecond need to get the same aggregation value. So we need not only to retract all records which are too old but also accumulate all records which are received in the same millisecond. Therefore, we would need to redesign the `ProcessFunction` a bit. - In `processElement()` we put the new record in the MapState and register a processing time timer for current time + 1. This will create a call back on `onTimer()` when current time + 1 is reached. - When `onTimer()` is called, we process the rows of timestamp - 1, retract all old values accumulate all new values and emit all rows of timestamp - 1. The implementation of `onTimer()` can reuse most of what is currently done in `processElement()`. Does that make sense @rtudoran? Do you want to make the change? Otherwise, I can also do it before merging. Sorry for recognizing this just now :-/ Best, Fabian > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15949379#comment-15949379 ] ASF GitHub Bot commented on FLINK-5654: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3641 Thanks for the update @rtudoran. Will merge this :-) > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15948734#comment-15948734 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3641 @sunjincheng121 @fhueske i have implemented all the changes you mentioned. -moved to index list traversals from iterators -keep last proctime in a valuestate to avoid searching for elements to be removed several times in case events arrive within the same timestamp -i moved the harness test in astandalone class name proctimetest in order to support potential future tests that would do proctime verifications > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15948681#comment-15948681 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108871599 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala --- @@ -696,6 +713,205 @@ class SqlITCase extends StreamingWithStateTestBase { "6,8,Hello world,51,9,5,9,1") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + + @Test + def testAvgSumAggregatationPartition(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +StreamITCase.testResults = mutable.MutableList() + +val sqlQuery = "SELECT a, AVG(c) OVER (PARTITION BY a ORDER BY procTime()" + + "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) AS avgC," + + "SUM(c) OVER (PARTITION BY a ORDER BY procTime()" + + "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) as sumC FROM MyTable" + +val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv).as('a, 'b, 'c, 'd, 'e) + +tEnv.registerTable("MyTable", t) + +val result = tEnv.sql(sqlQuery).toDataStream[Row] +result.addSink(new StreamITCase.StringSink) +env.execute() + +val expected = mutable.MutableList( + "1,0,0", + "2,1,1", + "2,1,3", + "3,3,3", + "3,3,7", + "3,4,12", + "4,6,13", + "4,6,6", + "4,7,21", + "4,7,30", + "5,10,10", + "5,10,21", + "5,11,33", + "5,11,46", + "5,12,60") + +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testAvgSumAggregatationNonPartition(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +StreamITCase.testResults = mutable.MutableList() + +val sqlQuery = "SELECT a, Count(c) OVER (ORDER BY procTime()" + + "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) AS avgC," + + "MIN(c) OVER (ORDER BY procTime()" + + "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) as sumC FROM MyTable" + +val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv).as('a, 'b, 'c, 'd, 'e) + +tEnv.registerTable("MyTable", t) + +val result = tEnv.sql(sqlQuery).toDataStream[Row] +result.addSink(new StreamITCase.StringSink) +env.execute() + +val expected = mutable.MutableList( + "1,1,0", + "2,2,0", + "2,3,0", + "3,4,0", + "3,5,0", + "3,6,0", + "4,7,0", + "4,8,0", + "4,9,0", + "4,10,0", + "5,11,0", + "5,12,0", + "5,13,0", + "5,14,0", + "5,15,0") + +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + + @Test + def testCountAggregatationProcTimeHarnessPartitioned(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) + +val rT = new RowTypeInfo(Array[TypeInformation[_]]( + INT_TYPE_INFO, + LONG_TYPE_INFO, + INT_TYPE_INFO, + STRING_TYPE_INFO, + LONG_TYPE_INFO), + Array("a","b","c","d","e")) + +val rTA = new RowTypeInfo(Array[TypeInformation[_]]( + LONG_TYPE_INFO), Array("count")) + +val processFunction = new KeyedProcessOperator[String,Row,Row]( + new ProcTimeBoundedProcessingOverProcessFunction( +Array(new CountAggFunction), +Array(1), +5, +rTA, +1000, +rT)) + +val rInput:Row = new Row(5) + rInput.setField(0, 1) + rInput.setField(1, 11L) + rInput.setField(2, 1) + rInput.setField(3, "aaa") + rInput.setField(4, 11L) + + val testHarness = new KeyedOneInputStreamOperatorTestHarness[String,Row,Row]( + processFunction, + new TupleRowSelector(3), + BasicTypeInfo.STRING_TYPE_INFO) + + testHarness.open(); + + testHarness.setProcessingTime(3) + + // timestamp is ignored in processing time +testHarness.processElement(new StreamRecord(rInput, 1001))
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15948676#comment-15948676 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108871122 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedProcessingOverProcessFunction.scala --- @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor } +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext } +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions.{ Accumulator, AggregateFunction } +import org.apache.flink.types.Row +import org.apache.flink.util.{ Collector, Preconditions } +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import scala.util.control.Breaks._ +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 } +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.{ ArrayList, LinkedList, List => JList } +import org.apache.flink.api.common.typeinfo.BasicTypeInfo + +/** + * Process Function used for the aggregate in bounded proc-time OVER window + * [[org.apache.flink.streaming.api.datastream.DataStream]] + * + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]] + * used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param forwardedFieldCount Is used to indicate fields in the current element to forward + * @param rowTypeInfo Is used to indicate the field schema + * @param timeBoundary Is used to indicate the processing time boundaries + * @param inputType It is used to mark the Row type of the input + */ +class ProcTimeBoundedProcessingOverProcessFunction( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Int], + private val forwardedFieldCount: Int, + private val rowTypeInfo: RowTypeInfo, + private val timeBoundary: Long, + private val inputType: TypeInformation[Row]) +extends ProcessFunction[Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowMapState: MapState[Long, JList[Row]] = _ + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) + +// We keep the elements received in a list state +// together with the ingestion time in the operator +val rowListTypeInfo: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]] +val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("rowmapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo) +rowMapState = getRuntimeContext.getMapState(mapStateDescriptor) + +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("overState", rowTypeInfo) +accumulatorState =
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15948651#comment-15948651 ] ASF GitHub Bot commented on FLINK-5654: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108867389 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedProcessingOverProcessFunction.scala --- @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor } +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext } +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions.{ Accumulator, AggregateFunction } +import org.apache.flink.types.Row +import org.apache.flink.util.{ Collector, Preconditions } +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import scala.util.control.Breaks._ +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 } +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.{ ArrayList, LinkedList, List => JList } +import org.apache.flink.api.common.typeinfo.BasicTypeInfo + +/** + * Process Function used for the aggregate in bounded proc-time OVER window + * [[org.apache.flink.streaming.api.datastream.DataStream]] + * + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]] + * used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param forwardedFieldCount Is used to indicate fields in the current element to forward + * @param rowTypeInfo Is used to indicate the field schema + * @param timeBoundary Is used to indicate the processing time boundaries + * @param inputType It is used to mark the Row type of the input + */ +class ProcTimeBoundedProcessingOverProcessFunction( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Int], + private val forwardedFieldCount: Int, + private val rowTypeInfo: RowTypeInfo, + private val timeBoundary: Long, + private val inputType: TypeInformation[Row]) +extends ProcessFunction[Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowMapState: MapState[Long, JList[Row]] = _ + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) + +// We keep the elements received in a list state +// together with the ingestion time in the operator +val rowListTypeInfo: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]] +val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("rowmapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo) +rowMapState = getRuntimeContext.getMapState(mapStateDescriptor) + +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("overState", rowTypeInfo) +accumulatorState =
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15948641#comment-15948641 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108866308 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedProcessingOverProcessFunction.scala --- @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor } +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext } +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions.{ Accumulator, AggregateFunction } +import org.apache.flink.types.Row +import org.apache.flink.util.{ Collector, Preconditions } +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import scala.util.control.Breaks._ +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 } +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.{ ArrayList, LinkedList, List => JList } +import org.apache.flink.api.common.typeinfo.BasicTypeInfo + +/** + * Process Function used for the aggregate in bounded proc-time OVER window + * [[org.apache.flink.streaming.api.datastream.DataStream]] + * + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]] + * used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param forwardedFieldCount Is used to indicate fields in the current element to forward + * @param rowTypeInfo Is used to indicate the field schema + * @param timeBoundary Is used to indicate the processing time boundaries + * @param inputType It is used to mark the Row type of the input + */ +class ProcTimeBoundedProcessingOverProcessFunction( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Int], + private val forwardedFieldCount: Int, + private val rowTypeInfo: RowTypeInfo, + private val timeBoundary: Long, + private val inputType: TypeInformation[Row]) +extends ProcessFunction[Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowMapState: MapState[Long, JList[Row]] = _ + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) + +// We keep the elements received in a list state +// together with the ingestion time in the operator +val rowListTypeInfo: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]] +val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("rowmapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo) +rowMapState = getRuntimeContext.getMapState(mapStateDescriptor) + +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("overState", rowTypeInfo) +accumulatorState =
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15948634#comment-15948634 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108865827 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedProcessingOverProcessFunction.scala --- @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor } +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext } +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions.{ Accumulator, AggregateFunction } +import org.apache.flink.types.Row +import org.apache.flink.util.{ Collector, Preconditions } +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import scala.util.control.Breaks._ +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 } +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.{ ArrayList, LinkedList, List => JList } +import org.apache.flink.api.common.typeinfo.BasicTypeInfo + +/** + * Process Function used for the aggregate in bounded proc-time OVER window + * [[org.apache.flink.streaming.api.datastream.DataStream]] + * + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]] + * used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param forwardedFieldCount Is used to indicate fields in the current element to forward + * @param rowTypeInfo Is used to indicate the field schema + * @param timeBoundary Is used to indicate the processing time boundaries + * @param inputType It is used to mark the Row type of the input + */ +class ProcTimeBoundedProcessingOverProcessFunction( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Int], + private val forwardedFieldCount: Int, + private val rowTypeInfo: RowTypeInfo, + private val timeBoundary: Long, + private val inputType: TypeInformation[Row]) +extends ProcessFunction[Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowMapState: MapState[Long, JList[Row]] = _ + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) + +// We keep the elements received in a list state +// together with the ingestion time in the operator +val rowListTypeInfo: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]] +val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("rowmapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo) +rowMapState = getRuntimeContext.getMapState(mapStateDescriptor) + +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("overState", rowTypeInfo) +accumulatorState =
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15948601#comment-15948601 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108863010 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedProcessingOverProcessFunction.scala --- @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor } +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext } +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions.{ Accumulator, AggregateFunction } +import org.apache.flink.types.Row +import org.apache.flink.util.{ Collector, Preconditions } +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import scala.util.control.Breaks._ +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 } +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.{ ArrayList, LinkedList, List => JList } +import org.apache.flink.api.common.typeinfo.BasicTypeInfo + +/** + * Process Function used for the aggregate in bounded proc-time OVER window + * [[org.apache.flink.streaming.api.datastream.DataStream]] + * + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]] + * used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param forwardedFieldCount Is used to indicate fields in the current element to forward + * @param rowTypeInfo Is used to indicate the field schema + * @param timeBoundary Is used to indicate the processing time boundaries + * @param inputType It is used to mark the Row type of the input + */ +class ProcTimeBoundedProcessingOverProcessFunction( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Int], + private val forwardedFieldCount: Int, + private val rowTypeInfo: RowTypeInfo, + private val timeBoundary: Long, + private val inputType: TypeInformation[Row]) +extends ProcessFunction[Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowMapState: MapState[Long, JList[Row]] = _ + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) + +// We keep the elements received in a list state +// together with the ingestion time in the operator +val rowListTypeInfo: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]] +val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("rowmapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo) +rowMapState = getRuntimeContext.getMapState(mapStateDescriptor) + +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("overState", rowTypeInfo) +accumulatorState =
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15948602#comment-15948602 ] ASF GitHub Bot commented on FLINK-5654: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108863035 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1208,5 +1208,40 @@ object AggregateUtil { private def gcd(a: Long, b: Long): Long = { if (b == 0) a else gcd(b, a % b) } -} + + /** + * Create an [[org.apache.flink.streaming.api.functions.ProcessFunction]] to evaluate final + * aggregate value over a window with processing time boundaries. + * + * @param namedAggregates List of calls to aggregate functions and their output field names + * @param inputType Input row type + * @param timeBoundary time limit of the window boundary expressed in milliseconds + * @param isPartitioned Flag to indicate whether the input is partitioned or not + * @return [[org.apache.flink.streaming.api.functions.ProcessFunction]] + */ + private[flink] def createTimeBoundedProcessingOverProcessFunction( +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +timeBoundary: Long, +isPartitioned: Boolean = true): ProcessFunction[Row, Row] = { + +val (aggFields, aggregates) = + transformToAggregateFunctions( +namedAggregates.map(_.getKey), +inputType, +needRetraction = false) --- End diff -- The parameter asks for AggregateFunctions that support retraction. There are some functions (like min and max) that are less efficient if they need to prepare for retraction. Therefore they have a retractable and a non-retractable version. The parameter decides which version is returned. > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15948599#comment-15948599 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108862890 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedProcessingOverProcessFunction.scala --- @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor } +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext } +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions.{ Accumulator, AggregateFunction } +import org.apache.flink.types.Row +import org.apache.flink.util.{ Collector, Preconditions } +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import scala.util.control.Breaks._ +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 } +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.{ ArrayList, LinkedList, List => JList } +import org.apache.flink.api.common.typeinfo.BasicTypeInfo + +/** + * Process Function used for the aggregate in bounded proc-time OVER window + * [[org.apache.flink.streaming.api.datastream.DataStream]] + * + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]] + * used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param forwardedFieldCount Is used to indicate fields in the current element to forward + * @param rowTypeInfo Is used to indicate the field schema + * @param timeBoundary Is used to indicate the processing time boundaries + * @param inputType It is used to mark the Row type of the input + */ +class ProcTimeBoundedProcessingOverProcessFunction( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Int], + private val forwardedFieldCount: Int, + private val rowTypeInfo: RowTypeInfo, + private val timeBoundary: Long, + private val inputType: TypeInformation[Row]) +extends ProcessFunction[Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowMapState: MapState[Long, JList[Row]] = _ + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) + +// We keep the elements received in a list state --- End diff -- ok > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15948566#comment-15948566 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108857410 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1208,5 +1208,40 @@ object AggregateUtil { private def gcd(a: Long, b: Long): Long = { if (b == 0) a else gcd(b, a % b) } -} + + /** + * Create an [[org.apache.flink.streaming.api.functions.ProcessFunction]] to evaluate final + * aggregate value over a window with processing time boundaries. + * + * @param namedAggregates List of calls to aggregate functions and their output field names + * @param inputType Input row type + * @param timeBoundary time limit of the window boundary expressed in milliseconds + * @param isPartitioned Flag to indicate whether the input is partitioned or not + * @return [[org.apache.flink.streaming.api.functions.ProcessFunction]] + */ + private[flink] def createTimeBoundedProcessingOverProcessFunction( +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +timeBoundary: Long, +isPartitioned: Boolean = true): ProcessFunction[Row, Row] = { + +val (aggFields, aggregates) = + transformToAggregateFunctions( +namedAggregates.map(_.getKey), +inputType, +needRetraction = false) --- End diff -- OK...i modified this. However i am not sure i perfectly understand the impact of this parameter in this case of proctime. > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15948563#comment-15948563 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108856694 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedProcessingOverProcessFunction.scala --- @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor } +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext } +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions.{ Accumulator, AggregateFunction } +import org.apache.flink.types.Row +import org.apache.flink.util.{ Collector, Preconditions } +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import scala.util.control.Breaks._ +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 } +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import scala.collection.mutable.Queue +import org.apache.flink.api.common.typeinfo.TypeHint +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.{ ArrayList, LinkedList, List => JList } + +/** + * Process Function used for the aggregate in partitioned bounded windows in + * [[org.apache.flink.streaming.api.datastream.DataStream]] + * + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]] + * used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param forwardedFieldCount Is used to indicate fields in the current element to forward + * @param rowTypeInfo Is used to indicate the field schema + * @param timeBoundary Is used to indicate the processing time boundaries + * @param inputType It is used to mark the Row type of the input + */ +class ProcTimeBoundedProcessingOverProcessFunction( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Int], + private val forwardedFieldCount: Int, + private val rowTypeInfo: RowTypeInfo, + private val timeBoundary: Long, --- End diff -- precedingTimeBoundary seems to be the combination of the 3 opinions :) > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15948555#comment-15948555 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108856359 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -274,6 +286,57 @@ class DataStreamOverAggregate( } result } + + def createTimeBoundedProcessingTimeOverWindow(inputDS: DataStream[Row]): DataStream[Row] = { + +val overWindow: Group = logicWindow.groups.get(0) +val partitionKeys: Array[Int] = overWindow.keys.toArray +val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates + +val index = overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef].getIndex +val count = input.getRowType.getFieldCount +val lowerBoundIndex = index - count + + +val timeBoundary = logicWindow.constants.get(lowerBoundIndex).getValue2 match { + case bd: java.math.BigDecimal => bd.longValue() + case _ => throw new TableException("OVER Window boundaries must be numeric") +} + + // get the output types +val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] + +val result: DataStream[Row] = +// partitioned aggregation +if (partitionKeys.nonEmpty) { + + val processFunction = AggregateUtil.createTimeBoundedProcessingOverProcessFunction( +namedAggregates, +inputType, +timeBoundary) + + inputDS + .keyBy(partitionKeys: _*) + .process(processFunction) + .returns(rowTypeInfo) + .name(aggOpName) + .asInstanceOf[DataStream[Row]] +} else { // non-partitioned aggregation + val processFunction = AggregateUtil.createTimeBoundedProcessingOverProcessFunction( --- End diff -- ok... :) > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15948226#comment-15948226 ] ASF GitHub Bot commented on FLINK-5654: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108824185 --- Diff: flink-libraries/flink-table/pom.xml --- @@ -140,6 +140,20 @@ under the License. ${project.version} test + + org.apache.flink + flink-streaming-java_2.10 + ${project.version} + test-jar + test + + + org.apache.flink + flink-runtime_2.10 + ${project.version} + test + test-jar + --- End diff -- thanks for explanation. @rtudoran @fhueske > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15948225#comment-15948225 ] ASF GitHub Bot commented on FLINK-5654: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108824115 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala --- @@ -696,6 +716,205 @@ class SqlITCase extends StreamingWithStateTestBase { "6,8,Hello world,51,9,5,9,1") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + + @Test + def testAvgSumAggregatationPartition(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +StreamITCase.testResults = mutable.MutableList() + +val sqlQuery = "SELECT a, AVG(c) OVER (PARTITION BY a ORDER BY procTime()" + + "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) AS avgC," + + "SUM(c) OVER (PARTITION BY a ORDER BY procTime()" + + "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) as sumC FROM MyTable" + +val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv).as('a, 'b, 'c, 'd, 'e) + +tEnv.registerTable("MyTable", t) + +val result = tEnv.sql(sqlQuery).toDataStream[Row] +result.addSink(new StreamITCase.StringSink) +env.execute() + +val expected = mutable.MutableList( + "1,0,0", + "2,1,1", + "2,1,3", + "3,3,3", + "3,3,7", + "3,4,12", + "4,6,13", + "4,6,6", + "4,7,21", + "4,7,30", + "5,10,10", + "5,10,21", + "5,11,33", + "5,11,46", + "5,12,60") + +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testAvgSumAggregatationNonPartition(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +StreamITCase.testResults = mutable.MutableList() + +val sqlQuery = "SELECT a, Count(c) OVER (ORDER BY procTime()" + + "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) AS avgC," + + "MIN(c) OVER (ORDER BY procTime()" + + "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) as sumC FROM MyTable" + +val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv).as('a, 'b, 'c, 'd, 'e) + +tEnv.registerTable("MyTable", t) + +val result = tEnv.sql(sqlQuery).toDataStream[Row] +result.addSink(new StreamITCase.StringSink) +env.execute() + +val expected = mutable.MutableList( + "1,1,0", + "2,2,0", + "2,3,0", + "3,4,0", + "3,5,0", + "3,6,0", + "4,7,0", + "4,8,0", + "4,9,0", + "4,10,0", + "5,11,0", + "5,12,0", + "5,13,0", + "5,14,0", + "5,15,0") + +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + + @Test + def testCountAggregatationProcTimeHarnessPartitioned(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment --- End diff -- Thanks for the explanation. +1 > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15948223#comment-15948223 ] ASF GitHub Bot commented on FLINK-5654: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108823957 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala --- @@ -696,6 +713,205 @@ class SqlITCase extends StreamingWithStateTestBase { "6,8,Hello world,51,9,5,9,1") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + + @Test + def testAvgSumAggregatationPartition(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +StreamITCase.testResults = mutable.MutableList() + +val sqlQuery = "SELECT a, AVG(c) OVER (PARTITION BY a ORDER BY procTime()" + + "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) AS avgC," + + "SUM(c) OVER (PARTITION BY a ORDER BY procTime()" + + "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) as sumC FROM MyTable" + +val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv).as('a, 'b, 'c, 'd, 'e) + +tEnv.registerTable("MyTable", t) + +val result = tEnv.sql(sqlQuery).toDataStream[Row] +result.addSink(new StreamITCase.StringSink) +env.execute() + +val expected = mutable.MutableList( + "1,0,0", + "2,1,1", + "2,1,3", + "3,3,3", + "3,3,7", + "3,4,12", + "4,6,13", + "4,6,6", + "4,7,21", + "4,7,30", + "5,10,10", + "5,10,21", + "5,11,33", + "5,11,46", + "5,12,60") + +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testAvgSumAggregatationNonPartition(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +StreamITCase.testResults = mutable.MutableList() + +val sqlQuery = "SELECT a, Count(c) OVER (ORDER BY procTime()" + + "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) AS avgC," + + "MIN(c) OVER (ORDER BY procTime()" + + "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) as sumC FROM MyTable" + +val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv).as('a, 'b, 'c, 'd, 'e) + +tEnv.registerTable("MyTable", t) + +val result = tEnv.sql(sqlQuery).toDataStream[Row] +result.addSink(new StreamITCase.StringSink) +env.execute() + +val expected = mutable.MutableList( + "1,1,0", + "2,2,0", + "2,3,0", + "3,4,0", + "3,5,0", + "3,6,0", + "4,7,0", + "4,8,0", + "4,9,0", + "4,10,0", + "5,11,0", + "5,12,0", + "5,13,0", + "5,14,0", + "5,15,0") + +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + + @Test + def testCountAggregatationProcTimeHarnessPartitioned(): Unit = { --- End diff -- `Aggregatation` -> `Aggregation` ? > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947506#comment-15947506 ] ASF GitHub Bot commented on FLINK-5654: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108722044 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -274,6 +286,57 @@ class DataStreamOverAggregate( } result } + + def createTimeBoundedProcessingTimeOverWindow(inputDS: DataStream[Row]): DataStream[Row] = { + +val overWindow: Group = logicWindow.groups.get(0) +val partitionKeys: Array[Int] = overWindow.keys.toArray +val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates + +val index = overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef].getIndex +val count = input.getRowType.getFieldCount +val lowerBoundIndex = index - count + + +val timeBoundary = logicWindow.constants.get(lowerBoundIndex).getValue2 match { + case bd: java.math.BigDecimal => bd.longValue() + case _ => throw new TableException("OVER Window boundaries must be numeric") +} + + // get the output types +val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] + +val result: DataStream[Row] = +// partitioned aggregation +if (partitionKeys.nonEmpty) { + + val processFunction = AggregateUtil.createTimeBoundedProcessingOverProcessFunction( +namedAggregates, +inputType, +timeBoundary) + + inputDS + .keyBy(partitionKeys: _*) + .process(processFunction) + .returns(rowTypeInfo) + .name(aggOpName) + .asInstanceOf[DataStream[Row]] +} else { // non-partitioned aggregation + val processFunction = AggregateUtil.createTimeBoundedProcessingOverProcessFunction( --- End diff -- The `isPartitioned` parameter is not used in the `createTimeBoundedProcessingOverProcessFunction()`. So both calls are identical and we can move it outside of the condition and save a few LOCs. > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947510#comment-15947510 ] ASF GitHub Bot commented on FLINK-5654: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108726727 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedProcessingOverProcessFunction.scala --- @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor } +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext } +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions.{ Accumulator, AggregateFunction } +import org.apache.flink.types.Row +import org.apache.flink.util.{ Collector, Preconditions } +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import scala.util.control.Breaks._ +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 } +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.{ ArrayList, LinkedList, List => JList } +import org.apache.flink.api.common.typeinfo.BasicTypeInfo + +/** + * Process Function used for the aggregate in bounded proc-time OVER window + * [[org.apache.flink.streaming.api.datastream.DataStream]] + * + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]] + * used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param forwardedFieldCount Is used to indicate fields in the current element to forward + * @param rowTypeInfo Is used to indicate the field schema + * @param timeBoundary Is used to indicate the processing time boundaries + * @param inputType It is used to mark the Row type of the input + */ +class ProcTimeBoundedProcessingOverProcessFunction( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Int], + private val forwardedFieldCount: Int, + private val rowTypeInfo: RowTypeInfo, + private val timeBoundary: Long, + private val inputType: TypeInformation[Row]) +extends ProcessFunction[Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowMapState: MapState[Long, JList[Row]] = _ + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) + +// We keep the elements received in a list state +// together with the ingestion time in the operator +val rowListTypeInfo: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]] +val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("rowmapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo) +rowMapState = getRuntimeContext.getMapState(mapStateDescriptor) + +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("overState", rowTypeInfo) +accumulatorState =
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947505#comment-15947505 ] ASF GitHub Bot commented on FLINK-5654: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108728846 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala --- @@ -696,6 +716,205 @@ class SqlITCase extends StreamingWithStateTestBase { "6,8,Hello world,51,9,5,9,1") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + + @Test + def testAvgSumAggregatationPartition(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +StreamITCase.testResults = mutable.MutableList() + +val sqlQuery = "SELECT a, AVG(c) OVER (PARTITION BY a ORDER BY procTime()" + + "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) AS avgC," + + "SUM(c) OVER (PARTITION BY a ORDER BY procTime()" + + "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) as sumC FROM MyTable" + +val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv).as('a, 'b, 'c, 'd, 'e) + +tEnv.registerTable("MyTable", t) + +val result = tEnv.sql(sqlQuery).toDataStream[Row] +result.addSink(new StreamITCase.StringSink) +env.execute() + +val expected = mutable.MutableList( + "1,0,0", + "2,1,1", + "2,1,3", + "3,3,3", + "3,3,7", + "3,4,12", + "4,6,13", + "4,6,6", + "4,7,21", + "4,7,30", + "5,10,10", + "5,10,21", + "5,11,33", + "5,11,46", + "5,12,60") + +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testAvgSumAggregatationNonPartition(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +StreamITCase.testResults = mutable.MutableList() + +val sqlQuery = "SELECT a, Count(c) OVER (ORDER BY procTime()" + + "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) AS avgC," + + "MIN(c) OVER (ORDER BY procTime()" + + "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) as sumC FROM MyTable" + +val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv).as('a, 'b, 'c, 'd, 'e) + +tEnv.registerTable("MyTable", t) + +val result = tEnv.sql(sqlQuery).toDataStream[Row] +result.addSink(new StreamITCase.StringSink) +env.execute() + +val expected = mutable.MutableList( + "1,1,0", + "2,2,0", + "2,3,0", + "3,4,0", + "3,5,0", + "3,6,0", + "4,7,0", + "4,8,0", + "4,9,0", + "4,10,0", + "5,11,0", + "5,12,0", + "5,13,0", + "5,14,0", + "5,15,0") + +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + + @Test + def testCountAggregatationProcTimeHarnessPartitioned(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment --- End diff -- Yes, the test harness allows to "control" the processing time (basically the time that this returned if you ask the process function for the current processing time). I would move this test to a separate file though because it is a unit test (class name should end with `Test`) and is a test specifically for the `ProcTimeBoundedProcessingOverProcessFunction`. > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947513#comment-15947513 ] ASF GitHub Bot commented on FLINK-5654: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108720517 --- Diff: flink-libraries/flink-table/pom.xml --- @@ -140,6 +140,20 @@ under the License. ${project.version} test + + org.apache.flink + flink-streaming-java_2.10 + ${project.version} + test-jar + test + + + org.apache.flink + flink-runtime_2.10 + ${project.version} + test + test-jar + --- End diff -- Yes, the dependency is required for the test harness class which allows to control processing time. This is needed to properly test processing time operators that have a semantic that depends on the time (so not only sorting by time which is implicitly true for processing time). The processing time OVER RANGE window groups data by processing time. If we want to test this without manual timing (which is not good unit test practice) we need the test harness. There will be more operators in the future that require this dependency. > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947507#comment-15947507 ] ASF GitHub Bot commented on FLINK-5654: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108726942 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedProcessingOverProcessFunction.scala --- @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor } +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext } +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions.{ Accumulator, AggregateFunction } +import org.apache.flink.types.Row +import org.apache.flink.util.{ Collector, Preconditions } +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import scala.util.control.Breaks._ +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 } +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.{ ArrayList, LinkedList, List => JList } +import org.apache.flink.api.common.typeinfo.BasicTypeInfo + +/** + * Process Function used for the aggregate in bounded proc-time OVER window + * [[org.apache.flink.streaming.api.datastream.DataStream]] + * + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]] + * used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param forwardedFieldCount Is used to indicate fields in the current element to forward + * @param rowTypeInfo Is used to indicate the field schema + * @param timeBoundary Is used to indicate the processing time boundaries + * @param inputType It is used to mark the Row type of the input + */ +class ProcTimeBoundedProcessingOverProcessFunction( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Int], + private val forwardedFieldCount: Int, + private val rowTypeInfo: RowTypeInfo, + private val timeBoundary: Long, + private val inputType: TypeInformation[Row]) +extends ProcessFunction[Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowMapState: MapState[Long, JList[Row]] = _ + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) + +// We keep the elements received in a list state +// together with the ingestion time in the operator +val rowListTypeInfo: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]] +val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("rowmapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo) +rowMapState = getRuntimeContext.getMapState(mapStateDescriptor) + +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("overState", rowTypeInfo) +accumulatorState =
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947512#comment-15947512 ] ASF GitHub Bot commented on FLINK-5654: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108721762 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1208,5 +1208,40 @@ object AggregateUtil { private def gcd(a: Long, b: Long): Long = { if (b == 0) a else gcd(b, a % b) } -} + + /** + * Create an [[org.apache.flink.streaming.api.functions.ProcessFunction]] to evaluate final + * aggregate value over a window with processing time boundaries. + * + * @param namedAggregates List of calls to aggregate functions and their output field names + * @param inputType Input row type + * @param timeBoundary time limit of the window boundary expressed in milliseconds + * @param isPartitioned Flag to indicate whether the input is partitioned or not + * @return [[org.apache.flink.streaming.api.functions.ProcessFunction]] + */ + private[flink] def createTimeBoundedProcessingOverProcessFunction( +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +timeBoundary: Long, +isPartitioned: Boolean = true): ProcessFunction[Row, Row] = { --- End diff -- Yes, it is not used in the function. Removing it should be fine. > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947509#comment-15947509 ] ASF GitHub Bot commented on FLINK-5654: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108722685 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1208,5 +1208,40 @@ object AggregateUtil { private def gcd(a: Long, b: Long): Long = { if (b == 0) a else gcd(b, a % b) } -} + + /** + * Create an [[org.apache.flink.streaming.api.functions.ProcessFunction]] to evaluate final + * aggregate value over a window with processing time boundaries. + * + * @param namedAggregates List of calls to aggregate functions and their output field names + * @param inputType Input row type + * @param timeBoundary time limit of the window boundary expressed in milliseconds + * @param isPartitioned Flag to indicate whether the input is partitioned or not + * @return [[org.apache.flink.streaming.api.functions.ProcessFunction]] + */ + private[flink] def createTimeBoundedProcessingOverProcessFunction( +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +timeBoundary: Long, +isPartitioned: Boolean = true): ProcessFunction[Row, Row] = { + +val (aggFields, aggregates) = + transformToAggregateFunctions( +namedAggregates.map(_.getKey), +inputType, +needRetraction = false) --- End diff -- `needRetraction` must be `true`. This is probably not caught by the tests, because the agg functions in the test are always retractable. You could use a `min` or `max` aggregation which have dedicated implementation for retraction. Actually, we should check if the over window tests have `min` and `max` agg functions as well. I'll do that once all over windows have been merged. > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947504#comment-15947504 ] ASF GitHub Bot commented on FLINK-5654: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108726645 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedProcessingOverProcessFunction.scala --- @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor } +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext } +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions.{ Accumulator, AggregateFunction } +import org.apache.flink.types.Row +import org.apache.flink.util.{ Collector, Preconditions } +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import scala.util.control.Breaks._ +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 } +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.{ ArrayList, LinkedList, List => JList } +import org.apache.flink.api.common.typeinfo.BasicTypeInfo + +/** + * Process Function used for the aggregate in bounded proc-time OVER window + * [[org.apache.flink.streaming.api.datastream.DataStream]] + * + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]] + * used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param forwardedFieldCount Is used to indicate fields in the current element to forward + * @param rowTypeInfo Is used to indicate the field schema + * @param timeBoundary Is used to indicate the processing time boundaries + * @param inputType It is used to mark the Row type of the input + */ +class ProcTimeBoundedProcessingOverProcessFunction( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Int], + private val forwardedFieldCount: Int, + private val rowTypeInfo: RowTypeInfo, + private val timeBoundary: Long, + private val inputType: TypeInformation[Row]) +extends ProcessFunction[Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowMapState: MapState[Long, JList[Row]] = _ + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) + +// We keep the elements received in a list state +// together with the ingestion time in the operator +val rowListTypeInfo: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]] +val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("rowmapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo) +rowMapState = getRuntimeContext.getMapState(mapStateDescriptor) + +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("overState", rowTypeInfo) +accumulatorState =
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947508#comment-15947508 ] ASF GitHub Bot commented on FLINK-5654: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108725149 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedProcessingOverProcessFunction.scala --- @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor } +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext } +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions.{ Accumulator, AggregateFunction } +import org.apache.flink.types.Row +import org.apache.flink.util.{ Collector, Preconditions } +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import scala.util.control.Breaks._ +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 } +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.{ ArrayList, LinkedList, List => JList } +import org.apache.flink.api.common.typeinfo.BasicTypeInfo + +/** + * Process Function used for the aggregate in bounded proc-time OVER window + * [[org.apache.flink.streaming.api.datastream.DataStream]] + * + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]] + * used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param forwardedFieldCount Is used to indicate fields in the current element to forward + * @param rowTypeInfo Is used to indicate the field schema + * @param timeBoundary Is used to indicate the processing time boundaries + * @param inputType It is used to mark the Row type of the input + */ +class ProcTimeBoundedProcessingOverProcessFunction( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Int], + private val forwardedFieldCount: Int, + private val rowTypeInfo: RowTypeInfo, + private val timeBoundary: Long, + private val inputType: TypeInformation[Row]) +extends ProcessFunction[Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowMapState: MapState[Long, JList[Row]] = _ + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) + +// We keep the elements received in a list state --- End diff -- update comment? We keep the data in a `MapState` indexed by ingestion time? > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported:
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947511#comment-15947511 ] ASF GitHub Bot commented on FLINK-5654: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108725947 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedProcessingOverProcessFunction.scala --- @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor } +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext } +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions.{ Accumulator, AggregateFunction } +import org.apache.flink.types.Row +import org.apache.flink.util.{ Collector, Preconditions } +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import scala.util.control.Breaks._ +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 } +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.{ ArrayList, LinkedList, List => JList } +import org.apache.flink.api.common.typeinfo.BasicTypeInfo + +/** + * Process Function used for the aggregate in bounded proc-time OVER window + * [[org.apache.flink.streaming.api.datastream.DataStream]] + * + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]] + * used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param forwardedFieldCount Is used to indicate fields in the current element to forward + * @param rowTypeInfo Is used to indicate the field schema + * @param timeBoundary Is used to indicate the processing time boundaries + * @param inputType It is used to mark the Row type of the input + */ +class ProcTimeBoundedProcessingOverProcessFunction( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Int], + private val forwardedFieldCount: Int, + private val rowTypeInfo: RowTypeInfo, + private val timeBoundary: Long, + private val inputType: TypeInformation[Row]) +extends ProcessFunction[Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowMapState: MapState[Long, JList[Row]] = _ + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) + +// We keep the elements received in a list state +// together with the ingestion time in the operator +val rowListTypeInfo: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]] +val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("rowmapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo) +rowMapState = getRuntimeContext.getMapState(mapStateDescriptor) + +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("overState", rowTypeInfo) +accumulatorState =
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947501#comment-15947501 ] ASF GitHub Bot commented on FLINK-5654: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108730528 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala --- @@ -696,6 +713,205 @@ class SqlITCase extends StreamingWithStateTestBase { "6,8,Hello world,51,9,5,9,1") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + + @Test + def testAvgSumAggregatationPartition(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +StreamITCase.testResults = mutable.MutableList() + +val sqlQuery = "SELECT a, AVG(c) OVER (PARTITION BY a ORDER BY procTime()" + + "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) AS avgC," + + "SUM(c) OVER (PARTITION BY a ORDER BY procTime()" + + "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) as sumC FROM MyTable" + +val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv).as('a, 'b, 'c, 'd, 'e) + +tEnv.registerTable("MyTable", t) + +val result = tEnv.sql(sqlQuery).toDataStream[Row] +result.addSink(new StreamITCase.StringSink) +env.execute() + +val expected = mutable.MutableList( + "1,0,0", + "2,1,1", + "2,1,3", + "3,3,3", + "3,3,7", + "3,4,12", + "4,6,13", + "4,6,6", + "4,7,21", + "4,7,30", + "5,10,10", + "5,10,21", + "5,11,33", + "5,11,46", + "5,12,60") + +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testAvgSumAggregatationNonPartition(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +StreamITCase.testResults = mutable.MutableList() + +val sqlQuery = "SELECT a, Count(c) OVER (ORDER BY procTime()" + + "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) AS avgC," + + "MIN(c) OVER (ORDER BY procTime()" + + "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) as sumC FROM MyTable" + +val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv).as('a, 'b, 'c, 'd, 'e) + +tEnv.registerTable("MyTable", t) + +val result = tEnv.sql(sqlQuery).toDataStream[Row] +result.addSink(new StreamITCase.StringSink) +env.execute() + +val expected = mutable.MutableList( + "1,1,0", + "2,2,0", + "2,3,0", + "3,4,0", + "3,5,0", + "3,6,0", + "4,7,0", + "4,8,0", + "4,9,0", + "4,10,0", + "5,11,0", + "5,12,0", + "5,13,0", + "5,14,0", + "5,15,0") + +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + + @Test + def testCountAggregatationProcTimeHarnessPartitioned(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) + +val rT = new RowTypeInfo(Array[TypeInformation[_]]( + INT_TYPE_INFO, + LONG_TYPE_INFO, + INT_TYPE_INFO, + STRING_TYPE_INFO, + LONG_TYPE_INFO), + Array("a","b","c","d","e")) + +val rTA = new RowTypeInfo(Array[TypeInformation[_]]( + LONG_TYPE_INFO), Array("count")) + +val processFunction = new KeyedProcessOperator[String,Row,Row]( + new ProcTimeBoundedProcessingOverProcessFunction( +Array(new CountAggFunction), +Array(1), +5, +rTA, +1000, +rT)) + +val rInput:Row = new Row(5) + rInput.setField(0, 1) + rInput.setField(1, 11L) + rInput.setField(2, 1) + rInput.setField(3, "aaa") + rInput.setField(4, 11L) + + val testHarness = new KeyedOneInputStreamOperatorTestHarness[String,Row,Row]( + processFunction, + new TupleRowSelector(3), + BasicTypeInfo.STRING_TYPE_INFO) + + testHarness.open(); + + testHarness.setProcessingTime(3) + + // timestamp is ignored in processing time +testHarness.processElement(new StreamRecord(rInput, 1001)) +
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947514#comment-15947514 ] ASF GitHub Bot commented on FLINK-5654: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108722922 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1208,5 +1208,40 @@ object AggregateUtil { private def gcd(a: Long, b: Long): Long = { if (b == 0) a else gcd(b, a % b) } -} + + /** + * Create an [[org.apache.flink.streaming.api.functions.ProcessFunction]] to evaluate final + * aggregate value over a window with processing time boundaries. + * + * @param namedAggregates List of calls to aggregate functions and their output field names + * @param inputType Input row type + * @param timeBoundary time limit of the window boundary expressed in milliseconds + * @param isPartitioned Flag to indicate whether the input is partitioned or not + * @return [[org.apache.flink.streaming.api.functions.ProcessFunction]] + */ + private[flink] def createTimeBoundedProcessingOverProcessFunction( +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +timeBoundary: Long, +isPartitioned: Boolean = true): ProcessFunction[Row, Row] = { + +val (aggFields, aggregates) = + transformToAggregateFunctions( +namedAggregates.map(_.getKey), +inputType, +needRetraction = false) +val aggregationStateType: RowTypeInfo = + createDataSetAggregateBufferDataType(Array(), aggregates, inputType) --- End diff -- use `createAccumulatorRowType(aggregates)` > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947515#comment-15947515 ] ASF GitHub Bot commented on FLINK-5654: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108729682 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala --- @@ -696,6 +713,205 @@ class SqlITCase extends StreamingWithStateTestBase { "6,8,Hello world,51,9,5,9,1") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + + @Test + def testAvgSumAggregatationPartition(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +StreamITCase.testResults = mutable.MutableList() + +val sqlQuery = "SELECT a, AVG(c) OVER (PARTITION BY a ORDER BY procTime()" + + "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) AS avgC," + + "SUM(c) OVER (PARTITION BY a ORDER BY procTime()" + + "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) as sumC FROM MyTable" + +val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv).as('a, 'b, 'c, 'd, 'e) + +tEnv.registerTable("MyTable", t) + +val result = tEnv.sql(sqlQuery).toDataStream[Row] +result.addSink(new StreamITCase.StringSink) +env.execute() + +val expected = mutable.MutableList( + "1,0,0", + "2,1,1", + "2,1,3", + "3,3,3", + "3,3,7", + "3,4,12", + "4,6,13", + "4,6,6", + "4,7,21", + "4,7,30", + "5,10,10", + "5,10,21", + "5,11,33", + "5,11,46", + "5,12,60") + +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testAvgSumAggregatationNonPartition(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +StreamITCase.testResults = mutable.MutableList() + +val sqlQuery = "SELECT a, Count(c) OVER (ORDER BY procTime()" + + "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) AS avgC," + + "MIN(c) OVER (ORDER BY procTime()" + + "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) as sumC FROM MyTable" + +val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv).as('a, 'b, 'c, 'd, 'e) + +tEnv.registerTable("MyTable", t) + +val result = tEnv.sql(sqlQuery).toDataStream[Row] +result.addSink(new StreamITCase.StringSink) +env.execute() + +val expected = mutable.MutableList( + "1,1,0", + "2,2,0", + "2,3,0", + "3,4,0", + "3,5,0", + "3,6,0", + "4,7,0", + "4,8,0", + "4,9,0", + "4,10,0", + "5,11,0", + "5,12,0", + "5,13,0", + "5,14,0", + "5,15,0") + +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + + @Test + def testCountAggregatationProcTimeHarnessPartitioned(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) --- End diff -- The test harness test does not need `StreamExecutionEnvironment` and `TableEnvironment`. > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime()
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947503#comment-15947503 ] ASF GitHub Bot commented on FLINK-5654: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108727511 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedProcessingOverProcessFunction.scala --- @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor } +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext } +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions.{ Accumulator, AggregateFunction } +import org.apache.flink.types.Row +import org.apache.flink.util.{ Collector, Preconditions } +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import scala.util.control.Breaks._ +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 } +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.{ ArrayList, LinkedList, List => JList } +import org.apache.flink.api.common.typeinfo.BasicTypeInfo + +/** + * Process Function used for the aggregate in bounded proc-time OVER window + * [[org.apache.flink.streaming.api.datastream.DataStream]] + * + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]] + * used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param forwardedFieldCount Is used to indicate fields in the current element to forward + * @param rowTypeInfo Is used to indicate the field schema + * @param timeBoundary Is used to indicate the processing time boundaries + * @param inputType It is used to mark the Row type of the input + */ +class ProcTimeBoundedProcessingOverProcessFunction( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Int], + private val forwardedFieldCount: Int, + private val rowTypeInfo: RowTypeInfo, + private val timeBoundary: Long, + private val inputType: TypeInformation[Row]) +extends ProcessFunction[Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowMapState: MapState[Long, JList[Row]] = _ + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) + +// We keep the elements received in a list state +// together with the ingestion time in the operator +val rowListTypeInfo: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]] +val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("rowmapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo) +rowMapState = getRuntimeContext.getMapState(mapStateDescriptor) + +val stateDescriptor: ValueStateDescriptor[Row] = + new ValueStateDescriptor[Row]("overState", rowTypeInfo) +accumulatorState =
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947502#comment-15947502 ] ASF GitHub Bot commented on FLINK-5654: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108724916 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedProcessingOverProcessFunction.scala --- @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor } +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext } +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions.{ Accumulator, AggregateFunction } +import org.apache.flink.types.Row +import org.apache.flink.util.{ Collector, Preconditions } +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import scala.util.control.Breaks._ +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 } +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import scala.collection.mutable.Queue +import org.apache.flink.api.common.typeinfo.TypeHint +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.{ ArrayList, LinkedList, List => JList } + +/** + * Process Function used for the aggregate in partitioned bounded windows in + * [[org.apache.flink.streaming.api.datastream.DataStream]] + * + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]] + * used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param forwardedFieldCount Is used to indicate fields in the current element to forward + * @param rowTypeInfo Is used to indicate the field schema + * @param timeBoundary Is used to indicate the processing time boundaries + * @param inputType It is used to mark the Row type of the input + */ +class ProcTimeBoundedProcessingOverProcessFunction( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Int], + private val forwardedFieldCount: Int, + private val rowTypeInfo: RowTypeInfo, + private val timeBoundary: Long, --- End diff -- Another option would be `precedingTimeBoundary` ;-). But I'm also fine with `timeBoundary`. > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947437#comment-15947437 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3641 @sunjincheng121 - thanks for the review. I addressed most of the comments. @fhueske > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947406#comment-15947406 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108717894 --- Diff: flink-libraries/flink-table/pom.xml --- @@ -140,6 +140,20 @@ under the License. ${project.version} test + + org.apache.flink + flink-streaming-java_2.10 + ${project.version} + test-jar + test + + + org.apache.flink + flink-runtime_2.10 + ${project.version} + test + test-jar + --- End diff -- @sunjincheng121 - as per my previous answer - @fhueske asked explicitly for this test and dependencies. Initially there was suppose to be only one dependency but this had an inner dependency and that is why you have these. > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947402#comment-15947402 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108717425 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala --- @@ -696,6 +716,205 @@ class SqlITCase extends StreamingWithStateTestBase { "6,8,Hello world,51,9,5,9,1") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + + @Test + def testAvgSumAggregatationPartition(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +StreamITCase.testResults = mutable.MutableList() + +val sqlQuery = "SELECT a, AVG(c) OVER (PARTITION BY a ORDER BY procTime()" + + "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) AS avgC," + + "SUM(c) OVER (PARTITION BY a ORDER BY procTime()" + + "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) as sumC FROM MyTable" + +val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv).as('a, 'b, 'c, 'd, 'e) + +tEnv.registerTable("MyTable", t) + +val result = tEnv.sql(sqlQuery).toDataStream[Row] +result.addSink(new StreamITCase.StringSink) +env.execute() + +val expected = mutable.MutableList( + "1,0,0", + "2,1,1", + "2,1,3", + "3,3,3", + "3,3,7", + "3,4,12", + "4,6,13", + "4,6,6", + "4,7,21", + "4,7,30", + "5,10,10", + "5,10,21", + "5,11,33", + "5,11,46", + "5,12,60") + +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testAvgSumAggregatationNonPartition(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +StreamITCase.testResults = mutable.MutableList() + +val sqlQuery = "SELECT a, Count(c) OVER (ORDER BY procTime()" + + "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) AS avgC," + + "MIN(c) OVER (ORDER BY procTime()" + + "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) as sumC FROM MyTable" + +val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv).as('a, 'b, 'c, 'd, 'e) + +tEnv.registerTable("MyTable", t) + +val result = tEnv.sql(sqlQuery).toDataStream[Row] +result.addSink(new StreamITCase.StringSink) +env.execute() + +val expected = mutable.MutableList( + "1,1,0", + "2,2,0", + "2,3,0", + "3,4,0", + "3,5,0", + "3,6,0", + "4,7,0", + "4,8,0", + "4,9,0", + "4,10,0", + "5,11,0", + "5,12,0", + "5,13,0", + "5,14,0", + "5,15,0") + +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + + @Test + def testCountAggregatationProcTimeHarnessPartitioned(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment --- End diff -- Simple answer is that @fhueske asked for it for quite some time Long answer is that we need to use the framework that was developed specifically to check the proctime working of the operators. > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947400#comment-15947400 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108717176 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala --- @@ -19,7 +19,10 @@ package org.apache.flink.table.api.scala.stream.sql import org.apache.flink.api.scala._ -import org.apache.flink.table.api.scala.stream.sql.SqlITCase.EventTimeSourceFunction +import org.apache.flink.table.api.scala.stream.sql.SqlITCase.{ + EventTimeSourceFunction, + RowResultSortComparator, + TupleRowSelector} --- End diff -- Done...but i must admit i am confused with the application of the style. I thought we go for lines of less than 100 characters...why would this be an exception? @sunjincheng121 > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947396#comment-15947396 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108716511 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedProcessingOverProcessFunction.scala --- @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor } +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext } +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions.{ Accumulator, AggregateFunction } +import org.apache.flink.types.Row +import org.apache.flink.util.{ Collector, Preconditions } +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import scala.util.control.Breaks._ +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 } +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import scala.collection.mutable.Queue +import org.apache.flink.api.common.typeinfo.TypeHint +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.{ ArrayList, LinkedList, List => JList } + +/** + * Process Function used for the aggregate in partitioned bounded windows in + * [[org.apache.flink.streaming.api.datastream.DataStream]] + * + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]] + * used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param forwardedFieldCount Is used to indicate fields in the current element to forward + * @param rowTypeInfo Is used to indicate the field schema + * @param timeBoundary Is used to indicate the processing time boundaries + * @param inputType It is used to mark the Row type of the input + */ +class ProcTimeBoundedProcessingOverProcessFunction( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Int], + private val forwardedFieldCount: Int, + private val rowTypeInfo: RowTypeInfo, + private val timeBoundary: Long, + private val inputType: TypeInformation[Row]) +extends ProcessFunction[Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowMapState: MapState[Long, JList[Row]] = _ + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) + +// We keep the elements received in a list state +// together with the ingestion time in the operator +val rowListTypeInfo: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]] +val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("rowmapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo) +rowMapState = getRuntimeContext.getMapState(mapStateDescriptor) + +val
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947393#comment-15947393 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108716153 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedProcessingOverProcessFunction.scala --- @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor } +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext } +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions.{ Accumulator, AggregateFunction } +import org.apache.flink.types.Row +import org.apache.flink.util.{ Collector, Preconditions } +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import scala.util.control.Breaks._ +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 } +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import scala.collection.mutable.Queue +import org.apache.flink.api.common.typeinfo.TypeHint +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.{ ArrayList, LinkedList, List => JList } + +/** + * Process Function used for the aggregate in partitioned bounded windows in + * [[org.apache.flink.streaming.api.datastream.DataStream]] + * + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]] + * used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param forwardedFieldCount Is used to indicate fields in the current element to forward + * @param rowTypeInfo Is used to indicate the field schema + * @param timeBoundary Is used to indicate the processing time boundaries + * @param inputType It is used to mark the Row type of the input + */ +class ProcTimeBoundedProcessingOverProcessFunction( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Int], + private val forwardedFieldCount: Int, + private val rowTypeInfo: RowTypeInfo, + private val timeBoundary: Long, --- End diff -- see previous comment > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single >
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947387#comment-15947387 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108715515 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1208,5 +1208,40 @@ object AggregateUtil { private def gcd(a: Long, b: Long): Long = { if (b == 0) a else gcd(b, a % b) } -} + + /** + * Create an [[org.apache.flink.streaming.api.functions.ProcessFunction]] to evaluate final + * aggregate value over a window with processing time boundaries. + * + * @param namedAggregates List of calls to aggregate functions and their output field names + * @param inputType Input row type + * @param timeBoundary time limit of the window boundary expressed in milliseconds + * @param isPartitioned Flag to indicate whether the input is partitioned or not + * @return [[org.apache.flink.streaming.api.functions.ProcessFunction]] + */ + private[flink] def createTimeBoundedProcessingOverProcessFunction( +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +timeBoundary: Long, --- End diff -- @sunjincheng121 :) ...i feel it is more expressive to mention time as the window is related to time boundaries. Offset for me sounds more related to indexes...but i am not a native speaker though... > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947386#comment-15947386 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108715233 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -274,6 +286,57 @@ class DataStreamOverAggregate( } result } + + def createTimeBoundedProcessingTimeOverWindow(inputDS: DataStream[Row]): DataStream[Row] = { + +val overWindow: Group = logicWindow.groups.get(0) +val partitionKeys: Array[Int] = overWindow.keys.toArray +val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates + +val index = overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef].getIndex +val count = input.getRowType.getFieldCount +val lowerBoundIndex = index - count + + +val timeBoundary = logicWindow.constants.get(lowerBoundIndex).getValue2 match { + case bd: java.math.BigDecimal => bd.longValue() + case _ => throw new TableException("OVER Window boundaries must be numeric") +} + + // get the output types +val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] + +val result: DataStream[Row] = +// partitioned aggregation +if (partitionKeys.nonEmpty) { + + val processFunction = AggregateUtil.createTimeBoundedProcessingOverProcessFunction( +namedAggregates, +inputType, +timeBoundary) + + inputDS + .keyBy(partitionKeys: _*) + .process(processFunction) + .returns(rowTypeInfo) + .name(aggOpName) + .asInstanceOf[DataStream[Row]] +} else { // non-partitioned aggregation + val processFunction = AggregateUtil.createTimeBoundedProcessingOverProcessFunction( --- End diff -- @sunjincheng121 I disagree with this. Even if we move the creation of the process function at the top of if (which is fine), we still need to make the differentiation on what to use in the KeyBy - the actual field or a NullSelector. Hence, as the if is needed i prefer to keep it as it is > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947357#comment-15947357 ] ASF GitHub Bot commented on FLINK-5654: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108708096 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala --- @@ -19,7 +19,10 @@ package org.apache.flink.table.api.scala.stream.sql import org.apache.flink.api.scala._ -import org.apache.flink.table.api.scala.stream.sql.SqlITCase.EventTimeSourceFunction +import org.apache.flink.table.api.scala.stream.sql.SqlITCase.{ + EventTimeSourceFunction, + RowResultSortComparator, + TupleRowSelector} --- End diff -- Keep in one line. > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947362#comment-15947362 ] ASF GitHub Bot commented on FLINK-5654: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108705166 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedProcessingOverProcessFunction.scala --- @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor } +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext } +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions.{ Accumulator, AggregateFunction } +import org.apache.flink.types.Row +import org.apache.flink.util.{ Collector, Preconditions } +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import scala.util.control.Breaks._ +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 } +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import scala.collection.mutable.Queue +import org.apache.flink.api.common.typeinfo.TypeHint +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.{ ArrayList, LinkedList, List => JList } + +/** + * Process Function used for the aggregate in partitioned bounded windows in + * [[org.apache.flink.streaming.api.datastream.DataStream]] + * + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]] + * used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param forwardedFieldCount Is used to indicate fields in the current element to forward + * @param rowTypeInfo Is used to indicate the field schema + * @param timeBoundary Is used to indicate the processing time boundaries + * @param inputType It is used to mark the Row type of the input + */ +class ProcTimeBoundedProcessingOverProcessFunction( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Int], + private val forwardedFieldCount: Int, + private val rowTypeInfo: RowTypeInfo, + private val timeBoundary: Long, --- End diff -- Just a suggestion: `timeBoundary -> precedingOffset`. :) > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947353#comment-15947353 ] ASF GitHub Bot commented on FLINK-5654: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108703534 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1208,5 +1208,40 @@ object AggregateUtil { private def gcd(a: Long, b: Long): Long = { if (b == 0) a else gcd(b, a % b) } -} + + /** + * Create an [[org.apache.flink.streaming.api.functions.ProcessFunction]] to evaluate final + * aggregate value over a window with processing time boundaries. + * + * @param namedAggregates List of calls to aggregate functions and their output field names + * @param inputType Input row type + * @param timeBoundary time limit of the window boundary expressed in milliseconds + * @param isPartitioned Flag to indicate whether the input is partitioned or not + * @return [[org.apache.flink.streaming.api.functions.ProcessFunction]] + */ + private[flink] def createTimeBoundedProcessingOverProcessFunction( +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +timeBoundary: Long, +isPartitioned: Boolean = true): ProcessFunction[Row, Row] = { --- End diff -- We can remove this param. What do you think? > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947363#comment-15947363 ] ASF GitHub Bot commented on FLINK-5654: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108701977 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -17,24 +17,38 @@ */ package org.apache.flink.table.plan.nodes.datastream -import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import java.util.{ List => JList } + +import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair +import org.apache.calcite.plan.RelOptCluster +import org.apache.calcite.plan.RelTraitSet --- End diff -- Please remove un-use import. > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947352#comment-15947352 ] ASF GitHub Bot commented on FLINK-5654: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108703070 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -274,6 +286,57 @@ class DataStreamOverAggregate( } result } + + def createTimeBoundedProcessingTimeOverWindow(inputDS: DataStream[Row]): DataStream[Row] = { + +val overWindow: Group = logicWindow.groups.get(0) +val partitionKeys: Array[Int] = overWindow.keys.toArray +val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates + +val index = overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef].getIndex +val count = input.getRowType.getFieldCount +val lowerBoundIndex = index - count + + +val timeBoundary = logicWindow.constants.get(lowerBoundIndex).getValue2 match { + case bd: java.math.BigDecimal => bd.longValue() + case _ => throw new TableException("OVER Window boundaries must be numeric") +} + + // get the output types +val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] + +val result: DataStream[Row] = +// partitioned aggregation +if (partitionKeys.nonEmpty) { + + val processFunction = AggregateUtil.createTimeBoundedProcessingOverProcessFunction( --- End diff -- Move it to the top of `if` statement. > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947355#comment-15947355 ] ASF GitHub Bot commented on FLINK-5654: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108703385 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -274,6 +286,57 @@ class DataStreamOverAggregate( } result } + + def createTimeBoundedProcessingTimeOverWindow(inputDS: DataStream[Row]): DataStream[Row] = { + +val overWindow: Group = logicWindow.groups.get(0) +val partitionKeys: Array[Int] = overWindow.keys.toArray +val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates + +val index = overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef].getIndex +val count = input.getRowType.getFieldCount +val lowerBoundIndex = index - count + + +val timeBoundary = logicWindow.constants.get(lowerBoundIndex).getValue2 match { + case bd: java.math.BigDecimal => bd.longValue() + case _ => throw new TableException("OVER Window boundaries must be numeric") +} + + // get the output types +val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] + +val result: DataStream[Row] = +// partitioned aggregation +if (partitionKeys.nonEmpty) { + + val processFunction = AggregateUtil.createTimeBoundedProcessingOverProcessFunction( +namedAggregates, +inputType, +timeBoundary) + + inputDS + .keyBy(partitionKeys: _*) + .process(processFunction) + .returns(rowTypeInfo) + .name(aggOpName) + .asInstanceOf[DataStream[Row]] +} else { // non-partitioned aggregation + val processFunction = AggregateUtil.createTimeBoundedProcessingOverProcessFunction( --- End diff -- We can remove it If we have created processFunction before if statement. > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947356#comment-15947356 ] ASF GitHub Bot commented on FLINK-5654: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108703979 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala --- @@ -1208,5 +1208,40 @@ object AggregateUtil { private def gcd(a: Long, b: Long): Long = { if (b == 0) a else gcd(b, a % b) } -} + + /** + * Create an [[org.apache.flink.streaming.api.functions.ProcessFunction]] to evaluate final + * aggregate value over a window with processing time boundaries. + * + * @param namedAggregates List of calls to aggregate functions and their output field names + * @param inputType Input row type + * @param timeBoundary time limit of the window boundary expressed in milliseconds + * @param isPartitioned Flag to indicate whether the input is partitioned or not + * @return [[org.apache.flink.streaming.api.functions.ProcessFunction]] + */ + private[flink] def createTimeBoundedProcessingOverProcessFunction( +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +inputType: RelDataType, +timeBoundary: Long, --- End diff -- Just a suggestion: `timeBoundary -> precedingOffset`. :) > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947361#comment-15947361 ] ASF GitHub Bot commented on FLINK-5654: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108704941 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedProcessingOverProcessFunction.scala --- @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor } +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext } +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions.{ Accumulator, AggregateFunction } +import org.apache.flink.types.Row +import org.apache.flink.util.{ Collector, Preconditions } +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import scala.util.control.Breaks._ +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 } +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import scala.collection.mutable.Queue +import org.apache.flink.api.common.typeinfo.TypeHint +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.{ ArrayList, LinkedList, List => JList } + +/** + * Process Function used for the aggregate in partitioned bounded windows in --- End diff -- Remove `partitioned `. `Process Function used for the aggregate in partitioned bounded` ->` Process Function used for the aggregate in bounded proc-time OVER window` > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947366#comment-15947366 ] ASF GitHub Bot commented on FLINK-5654: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108709629 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala --- @@ -696,6 +716,205 @@ class SqlITCase extends StreamingWithStateTestBase { "6,8,Hello world,51,9,5,9,1") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + + @Test + def testAvgSumAggregatationPartition(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +StreamITCase.testResults = mutable.MutableList() + +val sqlQuery = "SELECT a, AVG(c) OVER (PARTITION BY a ORDER BY procTime()" + + "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) AS avgC," + + "SUM(c) OVER (PARTITION BY a ORDER BY procTime()" + + "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) as sumC FROM MyTable" + +val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv).as('a, 'b, 'c, 'd, 'e) + +tEnv.registerTable("MyTable", t) + +val result = tEnv.sql(sqlQuery).toDataStream[Row] +result.addSink(new StreamITCase.StringSink) +env.execute() + +val expected = mutable.MutableList( + "1,0,0", + "2,1,1", + "2,1,3", + "3,3,3", + "3,3,7", + "3,4,12", + "4,6,13", + "4,6,6", + "4,7,21", + "4,7,30", + "5,10,10", + "5,10,21", + "5,11,33", + "5,11,46", + "5,12,60") + +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testAvgSumAggregatationNonPartition(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +StreamITCase.testResults = mutable.MutableList() + +val sqlQuery = "SELECT a, Count(c) OVER (ORDER BY procTime()" + + "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) AS avgC," + + "MIN(c) OVER (ORDER BY procTime()" + + "RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) as sumC FROM MyTable" + +val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv).as('a, 'b, 'c, 'd, 'e) + +tEnv.registerTable("MyTable", t) + +val result = tEnv.sql(sqlQuery).toDataStream[Row] +result.addSink(new StreamITCase.StringSink) +env.execute() + +val expected = mutable.MutableList( + "1,1,0", + "2,2,0", + "2,3,0", + "3,4,0", + "3,5,0", + "3,6,0", + "4,7,0", + "4,8,0", + "4,9,0", + "4,10,0", + "5,11,0", + "5,12,0", + "5,13,0", + "5,14,0", + "5,15,0") + +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + + @Test + def testCountAggregatationProcTimeHarnessPartitioned(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment --- End diff -- Can you explain why we need this test? > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947365#comment-15947365 ] ASF GitHub Bot commented on FLINK-5654: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108697224 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -274,6 +286,57 @@ class DataStreamOverAggregate( } result } + + def createTimeBoundedProcessingTimeOverWindow(inputDS: DataStream[Row]): DataStream[Row] = { + +val overWindow: Group = logicWindow.groups.get(0) +val partitionKeys: Array[Int] = overWindow.keys.toArray +val namedAggregates: Seq[CalcitePair[AggregateCall, String]] = generateNamedAggregates + +val index = overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef].getIndex +val count = input.getRowType.getFieldCount +val lowerBoundIndex = index - count + + +val timeBoundary = logicWindow.constants.get(lowerBoundIndex).getValue2 match { --- End diff -- Please use `OverAggregate#getLowerBoundary`. > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947360#comment-15947360 ] ASF GitHub Bot commented on FLINK-5654: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108706761 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedProcessingOverProcessFunction.scala --- @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor } +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext } +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions.{ Accumulator, AggregateFunction } +import org.apache.flink.types.Row +import org.apache.flink.util.{ Collector, Preconditions } +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import scala.util.control.Breaks._ +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 } +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import scala.collection.mutable.Queue +import org.apache.flink.api.common.typeinfo.TypeHint +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.{ ArrayList, LinkedList, List => JList } + +/** + * Process Function used for the aggregate in partitioned bounded windows in + * [[org.apache.flink.streaming.api.datastream.DataStream]] + * + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]] + * used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param forwardedFieldCount Is used to indicate fields in the current element to forward + * @param rowTypeInfo Is used to indicate the field schema + * @param timeBoundary Is used to indicate the processing time boundaries + * @param inputType It is used to mark the Row type of the input + */ +class ProcTimeBoundedProcessingOverProcessFunction( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Int], + private val forwardedFieldCount: Int, + private val rowTypeInfo: RowTypeInfo, + private val timeBoundary: Long, + private val inputType: TypeInformation[Row]) +extends ProcessFunction[Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowMapState: MapState[Long, JList[Row]] = _ + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) + +// We keep the elements received in a list state +// together with the ingestion time in the operator +val rowListTypeInfo: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]] +val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("rowmapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo) +rowMapState = getRuntimeContext.getMapState(mapStateDescriptor) + +
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947358#comment-15947358 ] ASF GitHub Bot commented on FLINK-5654: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108709954 --- Diff: flink-libraries/flink-table/pom.xml --- @@ -140,6 +140,20 @@ under the License. ${project.version} test + + org.apache.flink + flink-streaming-java_2.10 + ${project.version} + test-jar + test + + + org.apache.flink + flink-runtime_2.10 + ${project.version} + test + test-jar + --- End diff -- Can we remove this dependency If we remove `testCountAggregatationProcTimeHarnessPartitioned`? > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947364#comment-15947364 ] ASF GitHub Bot commented on FLINK-5654: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108705942 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedProcessingOverProcessFunction.scala --- @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor } +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext } +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions.{ Accumulator, AggregateFunction } +import org.apache.flink.types.Row +import org.apache.flink.util.{ Collector, Preconditions } +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import scala.util.control.Breaks._ +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 } +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import scala.collection.mutable.Queue +import org.apache.flink.api.common.typeinfo.TypeHint +import org.apache.flink.api.common.state.MapState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ListTypeInfo +import java.util.{ ArrayList, LinkedList, List => JList } + +/** + * Process Function used for the aggregate in partitioned bounded windows in + * [[org.apache.flink.streaming.api.datastream.DataStream]] + * + * @param aggregates the list of all [[org.apache.flink.table.functions.AggregateFunction]] + * used for this aggregation + * @param aggFields the position (in the input Row) of the input value for each aggregate + * @param forwardedFieldCount Is used to indicate fields in the current element to forward + * @param rowTypeInfo Is used to indicate the field schema + * @param timeBoundary Is used to indicate the processing time boundaries + * @param inputType It is used to mark the Row type of the input + */ +class ProcTimeBoundedProcessingOverProcessFunction( + private val aggregates: Array[AggregateFunction[_]], + private val aggFields: Array[Int], + private val forwardedFieldCount: Int, + private val rowTypeInfo: RowTypeInfo, + private val timeBoundary: Long, + private val inputType: TypeInformation[Row]) +extends ProcessFunction[Row, Row] { + + Preconditions.checkNotNull(aggregates) + Preconditions.checkNotNull(aggFields) + Preconditions.checkArgument(aggregates.length == aggFields.length) + + private var output: Row = _ + private var accumulatorState: ValueState[Row] = _ + private var rowMapState: MapState[Long, JList[Row]] = _ + + override def open(config: Configuration) { +output = new Row(forwardedFieldCount + aggregates.length) + +// We keep the elements received in a list state +// together with the ingestion time in the operator +val rowListTypeInfo: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]] +val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = + new MapStateDescriptor[Long, JList[Row]]("rowmapstate", +BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo) +rowMapState = getRuntimeContext.getMapState(mapStateDescriptor) + +
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947354#comment-15947354 ] ASF GitHub Bot commented on FLINK-5654: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108704205 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedProcessingOverProcessFunction.scala --- @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor } +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.state.{ FunctionInitializationContext, FunctionSnapshotContext } +import org.apache.flink.streaming.api.functions.ProcessFunction +import org.apache.flink.table.functions.{ Accumulator, AggregateFunction } +import org.apache.flink.types.Row +import org.apache.flink.util.{ Collector, Preconditions } +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.ValueStateDescriptor +import scala.util.control.Breaks._ +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 } +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import scala.collection.mutable.Queue +import org.apache.flink.api.common.typeinfo.TypeHint --- End diff -- Please remove the un-use import. > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15947359#comment-15947359 ] ASF GitHub Bot commented on FLINK-5654: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/3641#discussion_r108702149 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala --- @@ -17,24 +17,38 @@ */ package org.apache.flink.table.plan.nodes.datastream -import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import java.util.{ List => JList } + +import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair +import org.apache.calcite.plan.RelOptCluster +import org.apache.calcite.plan.RelTraitSet +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.RelWriter +import org.apache.calcite.rel.SingleRel import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.AggregateCall -import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.calcite.rel.core.Window +import org.apache.calcite.rel.core.Window.Group +import org.apache.calcite.sql.`type`.BasicSqlType import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.streaming.api.datastream.DataStream -import org.apache.flink.table.api.{StreamTableEnvironment, TableException} +import org.apache.flink.table.api.StreamTableEnvironment +import org.apache.flink.table.api.TableException import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.table.runtime.aggregate._ +import org.apache.flink.table.functions.ProcTimeType +import org.apache.flink.table.functions.RowTimeType import org.apache.flink.table.plan.nodes.OverAggregate +import org.apache.flink.table.runtime.aggregate.AggregateUtil import org.apache.flink.types.Row -import org.apache.calcite.rel.core.Window -import org.apache.calcite.rel.core.Window.Group -import java.util.{List => JList} - +import org.apache.calcite.sql.`type`.IntervalSqlType +import org.apache.calcite.rex.RexInputRef +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.util.Collector +import java.util.concurrent.TimeUnit --- End diff -- Please remove un-use import. > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15946870#comment-15946870 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3607 I have reopened this PR > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15946871#comment-15946871 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran closed the pull request at: https://github.com/apache/flink/pull/3607 > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15946869#comment-15946869 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3641 @sunjincheng121 @fhueske In order to revert the mess after the rebase that contained other comits, i have reopened the PR. I will close the previous one. Please consider this. It is the same > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15946868#comment-15946868 ] ASF GitHub Bot commented on FLINK-5654: --- GitHub user rtudoran opened a pull request: https://github.com/apache/flink/pull/3641 [FLINK-5654] - Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL Added tests based on harness util framework Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x ] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/huawei-flink/flink FLINK-5654Re3_2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3641.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3641 > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15946682#comment-15946682 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3607 @sunjincheng121 @fhueske Of course i understand and work on the last master branch when starting an implementation ...as mentioned the initial commit that was done friday evening had the branch from friday morning... i am not sure what is the order of integrating the PR and the like...but hours are definitely not in the priority queue and therefore we constantly are being pushed in this situationso although i appreciate the good nature of the comment please consider the context... Nevertheless...please review the PR and decide whether you are going to integrate this or not > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15946496#comment-15946496 ] ASF GitHub Bot commented on FLINK-5654: --- Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3607 Hi, @rtudoran When open a PR. we should keep our commits based on the latest master branch, so that the PR. only contains the change for this issue and will be clearly for other's review.I had check your `Commits` list and changes. There were something wrong when you rebase the code. When I rebase code, I usually use git command as follow: ``` git checkout myBranch git remote add upstream https://github.com/apache/flink.git git fetch upstream git rebase upstream/master ``` Everyone's local environment is different, the above operation is for reference only. Best, SunJincheng > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15944652#comment-15944652 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3607 @sunjincheng121 i actually saw now that there are 64 files changed ...these is definitely due to the rebase... > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15944650#comment-15944650 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3607 @sunjincheng121 - Apperently there were many conflicts...most of them were stupid conflicts when git was trying to do the automatic merging. At least in 2 classes the methods you added and the ones i added were put one into the other...instead of one after the other. If you look over the classes you we see that actually there are not so many actual modifications :) > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15944625#comment-15944625 ] ASF GitHub Bot commented on FLINK-5654: --- Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3607 Hi @rtudoran I am a bit confused that why do we need to change so many files? Best, SunJincheng > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15943893#comment-15943893 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3607 @sunjincheng121 @fhueske @stefanobortoli I rebased and created the tests. Please have a look > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15942944#comment-15942944 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3590 @fhueske thanks Fabian for the note. I will do a rebase today on the PR as there was apparently a merge of the retraction support. I will also try to enhance the tests. Hopefully by tomorrow there will be everything ready > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15942407#comment-15942407 ] ASF GitHub Bot commented on FLINK-5654: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3590 Hi @rtudoran, thanks for doing the benchmark and posting the numbers! The recommended state backend for production settings is the RocksDB backend (see [production-readiness docs](https://ci.apache.org/projects/flink/flink-docs-release-1.2/ops/production_ready.html#choice-of-state-backend)). The in-memory backends store state as objects on the heap and can easily kill the JVM with an OutOfMemoryError. Also the in-memory backends do not de/serialize data, so there is not an actual advantage is using the MapState which was mainly motivated by the reduced serialization effort. There are plans to implement a state backend using managed memory (similar to the batch algorithms). This backend would also serialize and deserialize data to/from pre-allocated byte arrays. So optimizing for de/serialization makes sense, IMO. The `KeyedOneInputStreamOperatorTestHarness` is part of the `flink-streaming-java` test-jar artifact. This is added by adding the following dependency to the `flink-table` `pom.xml`. ``` org.apache.flink flink-streaming-java_2.10 ${project.version} test-jar test ``` I'll have a detailed look at your PR tomorrow. Best, Fabian > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15942299#comment-15942299 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3607 @sunjincheng121 i will take a look - although the branch was created friday morning - there should not be any conflicts or anything... > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15942297#comment-15942297 ] ASF GitHub Bot commented on FLINK-5654: --- Github user rtudoran commented on the issue: https://github.com/apache/flink/pull/3590 Hi @aljoscha sorry for not being more clear in presenting the benchamrk /evaluation - i used in-memory state backend (for us the key scenario is to support high performance so that is why i considered the in-memory scenario). > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15942230#comment-15942230 ] ASF GitHub Bot commented on FLINK-5654: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3590 @rtudoran Which state backend did you use? I think this can have a great impact on performance. > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5654) Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL
[ https://issues.apache.org/jira/browse/FLINK-5654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941412#comment-15941412 ] ASF GitHub Bot commented on FLINK-5654: --- Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3607 HI, @rtudoran Thanks for this PR. It's looks very promising. Please rebase code on master first, Then I glad to take a look this changes. Best, SunJincheng > Add processing time OVER RANGE BETWEEN x PRECEDING aggregation to SQL > - > > Key: FLINK-5654 > URL: https://issues.apache.org/jira/browse/FLINK-5654 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: radu > > The goal of this issue is to add support for OVER RANGE aggregations on > processing time streams to the SQL interface. > Queries similar to the following should be supported: > {code} > SELECT > a, > SUM(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS sumB, > MIN(b) OVER (PARTITION BY c ORDER BY procTime() RANGE BETWEEN INTERVAL '1' > HOUR PRECEDING AND CURRENT ROW) AS minB > FROM myStream > {code} > The following restrictions should initially apply: > - All OVER clauses in the same SELECT clause must be exactly the same. > - The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > - The ORDER BY clause may only have procTime() as parameter. procTime() is a > parameterless scalar function that just indicates processing time mode. > - UNBOUNDED PRECEDING is not supported (see FLINK-5657) > - FOLLOWING is not supported. > The restrictions will be resolved in follow up issues. If we find that some > of the restrictions are trivial to address, we can add the functionality in > this issue as well. > This issue includes: > - Design of the DataStream operator to compute OVER ROW aggregates > - Translation from Calcite's RelNode representation (LogicalProject with > RexOver expression). -- This message was sent by Atlassian JIRA (v6.3.15#6346)