Fabian Hueske created FLINK-8566:
------------------------------------

             Summary: Replace retract/insert of same record for state retention 
timer resets
                 Key: FLINK-8566
                 URL: https://issues.apache.org/jira/browse/FLINK-8566
             Project: Flink
          Issue Type: Improvement
          Components: Table API & SQL
    Affects Versions: 1.5.0
            Reporter: Fabian Hueske


Currently a simple query like {{SELECT DISTINCT a, b, c FROM tableX}} is 
translated into a plan that generates a retraction stream. However, one would 
assume that an append stream should be possible as well. In fact, the plan 
doesn't produce actual updates.

Internally, the {{DISTINCT}} is translated into a {{GROUP BY}} with all 
distinct fields being keys and no aggregation functions. The corresponding 
operator produces updates, because aggregation function might update their 
results as new records are received. So we could just implement a dedicated 
operator for {{DISTINCT}}. However, this would not work if a user configures a 
state retention time. In this case, we emit retraction/insert messages for the 
same (distinct) record whenever a new row is received to reset the state 
clean-up timers of the downstream operators. 

One way to solve this issue to implement a dedicated mechanism to update state 
clean-up timers for unchanged records instead of sending out retraction/insert 
messages with identical records. This mechanism would just be used to reset the 
timers and could also be used for append streams. For example, we could replace 
the boolean flag in CRow with a byte that can take more than two values. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to