[GitHub] flink issue #3252: [FLINK-5624] Support tumbling window on streaming tables ...

2017-02-16 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3252
  
Merging


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3252: [FLINK-5624] Support tumbling window on streaming tables ...

2017-02-16 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3252
  
Thanks for the update @haohui.
PR is good to merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3252: [FLINK-5624] Support tumbling window on streaming tables ...

2017-02-14 Thread haohui
Github user haohui commented on the issue:

https://github.com/apache/flink/pull/3252
  
Discussed with @fhueske offline. Thanks a lot for the comments. The V4 PR 
implements the following:

* Rebased on top of #3101 
* `LogicalWindowAggregateRule` implements `RelOptRule` instead of the 
`ConvertRule`.
* Instead of adding a new field in `LogicalWindowAggregate`, the 
implementation now transforms the original `LogicalAggregate(LogicalProject())` 
expression to `LogicalProject(LogicalWindowAggregate(LogicalProject(...)))`. 
The outermost projection ensures that the operators have the same row types 
before and after the transformation.

Please take another look.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3252: [FLINK-5624] Support tumbling window on streaming tables ...

2017-02-13 Thread haohui
Github user haohui commented on the issue:

https://github.com/apache/flink/pull/3252
  
The v3 PR gets the best from both of the worlds -- the code generator will 
throw exceptions if the queries actually execute the `rowtime()`.

Essentially it rewrites the project and the aggregate operators before 
passing the operators into the Volcano planner. @fhueske please take another 
look.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3252: [FLINK-5624] Support tumbling window on streaming tables ...

2017-02-07 Thread haohui
Github user haohui commented on the issue:

https://github.com/apache/flink/pull/3252
  
Adding `ROWTIME()` as an expression to enable users to specify event time 
windows.

After trying multiple approaches at the end I settled down with translating 
`LogicalAggregate` directly to `DataStreamAggregate`. The translation removes 
the group-by expression from the aggregate and adds the same expression as a 
window.

Note that `ROWTIME()` is actually translated to a call to the local 
timestamp. The expression has to be executable because Calcite creates a new 
project operator to compute the group-by expression, where the expression has 
to be executed. For example, the following query

```
SELECT COUNT(*) FROM table GROUP BY FLOOR(ROWTIME() TO HOUR)
```

will be translated to:

```
LogicalAggregate(group={$0}, agg={COUNT(*)})
  LogicalProject($0=FLOOR(ROWTIME() TO HOUR))
  ...
```

It's tempting to remove the group-by expression from the logical plan. 
However, it cannot be done using the optimization frameworks in Calcite. These 
frameworks expect the output types of the operators stay the same before and 
after the transformations. Removing the field actually changes the types thus 
Calcite will complain.

The down side of this approach is that it might be difficult for Flink to 
catch malformed queries such as `SELECT COUNT(*) FROM table GROUP BY 
FLOOR(ROWTIME() / 2 TO HOUR)` at compile-time. Any ideas to improve the 
situation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3252: [FLINK-5624] Support tumbling window on streaming tables ...

2017-02-06 Thread haohui
Github user haohui commented on the issue:

https://github.com/apache/flink/pull/3252
  
Updated the PR to recognize the `GROUP BY` clause instead of the `OVER` 
clause.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3252: [FLINK-5624] Support tumbling window on streaming tables ...

2017-02-02 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3252
  
Hi @haohui, thanks for your contribution!

The referenced JIRA is about adding support for group windows to SQL, not 
OVER (or row) windows. It should enable queries such as:

```
SELECT a, sum(b) as sumB, TUMBLE_END(rowtime(), INTERVAL '1' HOUR) AS t,
  FROM myT
GROUP BY TUMBLE(rowtime(), INTERVAL '1' HOUR), a;
```

I saw that you contributed `TUMBLE` just very recently to Calcite, so this 
feature is not yet available in a Calcite release that we could link against. 
Until then, we could add support for the more manual version of SQL tumbling 
windows:

```
SELECT a, SUM(b) AS sumB, CEIL(rowtime() TO HOUR) AS t,
  FROM myT
GROUP BY CEIL(rowtime() TO HOUR), a
```

We would also need to find a way to reference the `rowtime`. We do not want 
to expose this as an actual attribute in Flink's SQL (internally, Flink treats 
record timestamps as metadata which may not be modified by a query). The 
current approach would be to implement a built-in function which serves as a 
marker and is replaced during the translation.

Best, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---