[ https://issues.apache.org/jira/browse/FLINK-6260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15954739#comment-15954739 ]
radu commented on FLINK-6260: ----------------------------- [~fhueske] [~shijinkui] [~Yuhong_kyo] [~sunjincheng121] [~twalthr] [~stefano.bortoli] As per [~fhueske] suggestion in FLINK-6249 I have created a separate JIRA for the case of supporting distinct over group by. > Distinct Aggregates for Group By Windows > ---------------------------------------- > > Key: FLINK-6260 > URL: https://issues.apache.org/jira/browse/FLINK-6260 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL > Reporter: radu > Labels: features > > Time target: ProcTime/EventTime > SQL targeted query examples: > ------------ > Q1. Boundaries are expressed in GROUP BY clause and distinct is applied for > the elements of the aggregate(s) > `SELECT MIN( DISTINCT rowtime), prodID FROM stream1 GROUP BY FLOOR(procTime() > TO HOUR)` > Q2. Distinct is applied to the collection of outputs to be selected. > `SELECT STREAM DISTINCT procTime(), prodId FROM stream1 GROUP BY > FLOOR(procTime() TO DAY)` > => DISTINCT operation makes sense only within the context of windows or some > bounded defined structures. Otherwise the operation would keep an infinite > amount of data to ensure uniqueness and would not trigger for certain > functions (e.g. aggregates) > => We can follow the same design/implementation as for JIRA FLINK-6249 > (supporting Distinct Aggregates for OVER Windows) > => We can consider as a sub-JIRA issue the implementation of DISTINCT for > select clauses. > => Aggregations over distinct elements without any boundary (i.e. > within SELECT clause) do not make sense just as aggregations do not make > sense without groupings or windows. > If distinct is applied as in Q1 example on group elements than either we > define a new implementation if selection is general or extend the current > implementation of grouped aggregates with distinct group aggregates > If distinct is applied as in Q2 example for the select all elements, then a > new implementation needs to be defined. This would work over a specific > window / processFunction and within the processing function the uniqueness of > the results to be processed will be done. This will happen for each > partition. The data structure used to trace distinct elements will be reset > with each new window (i.e., group by scope) > > Examples > ------------ > `Q1: SELECT STREAM DISTINCT b FROM stream1 GROUP BY FLOOR(PROCTIME TO HOUR) ` > `Q2: SELECT COUNT(DISTINCT b) FROM stream1 GROUP BY FLOOR(PROCTIME() TO > HOUR) ` > ||Proctime||IngestionTime(Event)||Stream1||Q1||Q2|| > ||10:00:01| (ab,1)| | | > ||10:05:00| (aa,2)| | | > ||11:00:00| | ab,aa | 2 | > ||11:03:00| (aa,2)| | | > ||11:09:00| (aa,2 | | | > ||12:00:00| | aa | 1 | > |...| > Implementation option > --------------------- > Considering that the behavior is similar as the one implemented for over > window behavior (with the difference that the distinction is reset for each , > group scope), the implementation will be done by reusing the existing > implementation of the over window functions. Distinction can be achieved > within the aggregate itself or within the window/processfunction logic that > computes the aggregates. As multiple aggregates which require distinction can > be computed in the same time, the preferred option is to create distinction > within the process logic. For the case of selecting distinct outputs (i.e., > not aggregates) we can follow the same implementation design: support > distinction in the aggregation and than emitting only one output per each > element saw (instead of calling aggregate method of the aggregates) . -- This message was sent by Atlassian JIRA (v6.3.15#6346)