radu created FLINK-6260:
---------------------------

             Summary: Distinct Aggregates for Group By Windows
                 Key: FLINK-6260
                 URL: https://issues.apache.org/jira/browse/FLINK-6260
             Project: Flink
          Issue Type: New Feature
          Components: Table API & SQL
            Reporter: radu


Time target: ProcTime/EventTime

SQL targeted query examples:
------------

Q1. Boundaries are expressed in GROUP BY clause and distinct is applied for the 
elements of the aggregate(s)

`SELECT MIN( DISTINCT rowtime), prodID FROM stream1 GROUP BY FLOOR(procTime() 
TO HOUR)`

Q2. Distinct is applied to the collection of outputs to be selected.

`SELECT STREAM DISTINCT procTime(), prodId  FROM stream1 GROUP BY 
FLOOR(procTime() TO DAY)`


=>  DISTINCT operation makes sense only within the context of windows or some 
bounded defined structures. Otherwise the operation would keep an infinite 
amount of data to ensure uniqueness and would not trigger for certain functions 
(e.g. aggregates)

=>  We can follow the same design/implementation as for JIRA FLINK-6249 
(supporting Distinct Aggregates for OVER Windows)

=> We can consider as a sub-JIRA issue the implementation of DISTINCT for 
select clauses. 

=>   Aggregations over distinct elements without any boundary (i.e.     within 
SELECT clause) do not make sense just as aggregations do not     make sense 
without groupings or windows.


If distinct is applied as in Q1 example on group elements than either we define 
a new implementation if selection is general or extend the current 
implementation of grouped aggregates with distinct group aggregates

If distinct is applied as in Q2 example for the select all elements, then a new 
implementation needs to be defined. This would work over a specific window / 
processFunction and within the processing function the uniqueness of  the 
results to be processed will be done. This will happen for each partition. The 
data structure used to trace distinct elements will be reset with each new 
window (i.e., group by scope)
        

Examples
------------
`Q1: SELECT STREAM DISTINCT b FROM stream1 GROUP BY FLOOR(PROCTIME TO HOUR) `

`Q2: SELECT  COUNT(DISTINCT  b) FROM stream1 GROUP BY FLOOR(PROCTIME() TO HOUR) 
`

||Proctime||IngestionTime(Event)||Stream1||Q1||Q2||
||10:00:01|     (ab,1)|           |   | 
||10:05:00| (aa,2)|       |   | 
||11:00:00|           | ab,aa | 2 | 
||11:03:00|     (aa,2)|       |   |
||11:09:00|     (aa,2 |       |   | 
||12:00:00|               | aa    | 1 |
|...|


Implementation option
---------------------
Considering that the behavior is similar as the one implemented for  over 
window behavior (with the difference that the distinction is reset for each , 
group scope), the implementation will be done by reusing the existing 
implementation of the over window functions.  Distinction can be achieved 
within the aggregate itself or within the window/processfunction logic that 
computes the aggregates. As multiple aggregates which require distinction can 
be computed in the same time, the preferred option is to create distinction 
within the process logic. For the case of selecting distinct outputs (i.e., not 
aggregates) we can follow the same implementation design: support distinction 
in the aggregation and than emitting only one output per each element saw 
(instead of calling aggregate method of the aggregates) . 




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to