radu created FLINK-6082:
---------------------------

             Summary: Support window definition for SQL Queries based on WHERE 
clause with time condition
                 Key: FLINK-6082
                 URL: https://issues.apache.org/jira/browse/FLINK-6082
             Project: Flink
          Issue Type: Sub-task
          Components: Table API & SQL
            Reporter: radu


Time target: Proc Time

Calcite documentation refers to query examples where the (time)
boundaries are defined as condition within the WHERE clause. As Flink
community targets compatibility with Calcite, it makes sense to support
the definition of windows via this method as well as corresponding
aggregation on top of them.

SQL targeted query examples:
----------------------------

```SELECT productId, count(\*) FROM stream1 WHERE proctime BETWEEN current\_ 
timestamp - INTERVAL '1' HOUR AND current\_timestamp```

General comment:

1)  window boundaries are defined as conditions in WHERE clause.

2)  For indicating the usage of different stream times, rowtime and
    proctime can be used

3)  The boundaries are defined based on special construct provided by
    calcite: current\_timestamp and time operations

Description:
------------

The logic of this operator is strictly related to supporting aggregates
over sliding windows defined with OVER
([FLINK-5653](https://issues.apache.org/jira/browse/FLINK-5653),
[FLINK-5654](https://issues.apache.org/jira/browse/FLINK-5654),
[FLINK-5655](https://issues.apache.org/jira/browse/FLINK-5655),
[FLINK-5658](https://issues.apache.org/jira/browse/FLINK-5658),
[FLINK-5656](https://issues.apache.org/jira/browse/FLINK-5656)). In this
issue the design considered queries where the window is defined with the
syntax of OVER clause and aggregates are applied over this period. This
is similar in behavior with the only exception that the window
boundaries are defined with respect to the WHERE conditions. Besides
this the logic and the types of aggregates to be supported should be the
same (sum, count, avg, min, max). Supporting these types of query is
related to the pie chart problem tackled by calcite.

Similar as for the OVER windows, the construct should build rolling
windows (i.e., windows that are triggered and move with every incoming
event).

Functionality example
---------------------

We exemplify below the functionality of the IN/Exists when working with
streams.

`SELECT a, count(*) FROM stream1 WHERE proctime BETWEEN current_ timestamp - 
INTERVAL '1' HOUR AND current_timestamp;`

||IngestionTime(Event)||        Stream1||       Output||
|10:00:01       |Id1,10 |Id1,1|
|10:02:00       |Id2,2  |Id2,2|
|11:25:00       |Id3,2  |Id3,1|
|12:03:00       |Id4,15 |Id4,2|
|12:05:00       |Id5,11 |Id5,3|
|12:56:00       |Id6,20 |Id6,3|
|...|

Implementation option
---------------------

Considering that the query follows the same functionality as for the
aggregates over window, the implementation should follow the same
implementation as for the OVER clause. Considering that the WHERE
condition are typically related to timing, this means that in case of
one unbound boundary the
[FLINK-5658](https://issues.apache.org/jira/browse/FLINK-5658) should be
used, while for bounded time windows the
[FLINK-5654](https://issues.apache.org/jira/browse/FLINK-5654) design
should be used.

The window boundaries will be extracted from the WHERE condition.

The rule will not be mapped anymore to a LogicalWindow, which means that
the conversion to this would need to happen from the current
DataStreamCalc rule. In this sense, a dedicated condition will be added
such that in case the WHERE clause has time conditions, the operator
implementation of the Over clause (used in the previous issues) should
be used.

```
class DataStreamCalcRule

  
-----------------------------------------------------------------------------------------------
  {
  --- 
-------------------------------------------------------------------------------------------
      

      def convert(rel: RelNode): RelNode = {

      val calc: LogicalCalc = rel.asInstanceOf\[LogicalCalc\]

      val traitSet: RelTraitSet = 
rel.getTraitSet.replace(DataStreamConvention.INSTANCE)

      val convInput: RelNode = RelOptRule.convert(calc.getInput, 
DataStreamConvention.INSTANCE)
      
      IF(WHERE contains TIME limits)
      
      {
      
 >   IF(bounded)
>       
>       new DataStreamProcTimeTimeAggregate
>       
>       ELSE
>       
>       new DataStreamSlideEventTimeRowAgg
>       
>       }
>  
      
      Else
      
      **{**

      

      new DataStreamCalc(

      rel.getCluster,

      traitSet,

      convInput,

      rel.getRowType,

      calc.getProgram,

      description)

      }
      
      }

      }
  
-----------------------------------------------------------------------------------------------
```



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to