[GitHub] flink issue #3252: [FLINK-5624] Support tumbling window on streaming tables ...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3252 Merging --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3252: [FLINK-5624] Support tumbling window on streaming tables ...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3252 Thanks for the update @haohui. PR is good to merge. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3252: [FLINK-5624] Support tumbling window on streaming tables ...
Github user haohui commented on the issue: https://github.com/apache/flink/pull/3252 Discussed with @fhueske offline. Thanks a lot for the comments. The V4 PR implements the following: * Rebased on top of #3101 * `LogicalWindowAggregateRule` implements `RelOptRule` instead of the `ConvertRule`. * Instead of adding a new field in `LogicalWindowAggregate`, the implementation now transforms the original `LogicalAggregate(LogicalProject())` expression to `LogicalProject(LogicalWindowAggregate(LogicalProject(...)))`. The outermost projection ensures that the operators have the same row types before and after the transformation. Please take another look. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3252: [FLINK-5624] Support tumbling window on streaming tables ...
Github user haohui commented on the issue: https://github.com/apache/flink/pull/3252 The v3 PR gets the best from both of the worlds -- the code generator will throw exceptions if the queries actually execute the `rowtime()`. Essentially it rewrites the project and the aggregate operators before passing the operators into the Volcano planner. @fhueske please take another look. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3252: [FLINK-5624] Support tumbling window on streaming tables ...
Github user haohui commented on the issue: https://github.com/apache/flink/pull/3252 Adding `ROWTIME()` as an expression to enable users to specify event time windows. After trying multiple approaches at the end I settled down with translating `LogicalAggregate` directly to `DataStreamAggregate`. The translation removes the group-by expression from the aggregate and adds the same expression as a window. Note that `ROWTIME()` is actually translated to a call to the local timestamp. The expression has to be executable because Calcite creates a new project operator to compute the group-by expression, where the expression has to be executed. For example, the following query ``` SELECT COUNT(*) FROM table GROUP BY FLOOR(ROWTIME() TO HOUR) ``` will be translated to: ``` LogicalAggregate(group={$0}, agg={COUNT(*)}) LogicalProject($0=FLOOR(ROWTIME() TO HOUR)) ... ``` It's tempting to remove the group-by expression from the logical plan. However, it cannot be done using the optimization frameworks in Calcite. These frameworks expect the output types of the operators stay the same before and after the transformations. Removing the field actually changes the types thus Calcite will complain. The down side of this approach is that it might be difficult for Flink to catch malformed queries such as `SELECT COUNT(*) FROM table GROUP BY FLOOR(ROWTIME() / 2 TO HOUR)` at compile-time. Any ideas to improve the situation? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3252: [FLINK-5624] Support tumbling window on streaming tables ...
Github user haohui commented on the issue: https://github.com/apache/flink/pull/3252 Updated the PR to recognize the `GROUP BY` clause instead of the `OVER` clause. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3252: [FLINK-5624] Support tumbling window on streaming tables ...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3252 Hi @haohui, thanks for your contribution! The referenced JIRA is about adding support for group windows to SQL, not OVER (or row) windows. It should enable queries such as: ``` SELECT a, sum(b) as sumB, TUMBLE_END(rowtime(), INTERVAL '1' HOUR) AS t, FROM myT GROUP BY TUMBLE(rowtime(), INTERVAL '1' HOUR), a; ``` I saw that you contributed `TUMBLE` just very recently to Calcite, so this feature is not yet available in a Calcite release that we could link against. Until then, we could add support for the more manual version of SQL tumbling windows: ``` SELECT a, SUM(b) AS sumB, CEIL(rowtime() TO HOUR) AS t, FROM myT GROUP BY CEIL(rowtime() TO HOUR), a ``` We would also need to find a way to reference the `rowtime`. We do not want to expose this as an actual attribute in Flink's SQL (internally, Flink treats record timestamps as metadata which may not be modified by a query). The current approach would be to implement a built-in function which serves as a marker and is replaced during the translation. Best, Fabian --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---