Hi Henkka, This should be fairly easy to implement in a ProcessFunction. You're making a good call to worry about the number of timers. If you register a timer multiple times on the same time, the timer is deduplicated ;-) and will only fire once for that time. That's why the state retention time allows to set a min and max timer. With that, you only have to set a timer every (max - min) interval. For example, if you say, the application should keep the state at least for 12 hours but the most for 14 hours, you only need to register a new timer every 2 hours.
Hope this helps, Fabian 2018-02-06 15:47 GMT+01:00 Henri Heiskanen <henri.heiska...@gmail.com>: > Hi, > > Thanks. > > Doing this deduplication would be easy just by using vanilla flink api and > state (check if this is a new key and then emit), but the issue has been > automatic state cleanup. However, it looks like this streaming sql > retention time implementation uses the process function and timer. I was a > bit reluctant to use that because I was worried that the approach would be > overkill with our volumes, but maybe it will work just fine. Can you help > me a bit how to implement it efficiently? > > Basically we get estimated of 20M of distinct rows/key and roughly 300 > events per key during one day. What I would like to do is to clear the > state for specific key if I have not seen such key for last 12 hours. I > think its very close to example here: https://ci.apache.org/ > projects/flink/flink-docs-release-1.4/dev/stream/ > operators/process_function.html. Instead of emitting the data onTimer I > would just clear the state. In the example each tuple will invoke > registerEventTimeTimer(). Is this the correct pattern? E.g. in our case we > could get hundreds of events with the same key during few minutes, so would > we then register hundreds of timer instances? > > Br, > Henkka > > On Tue, Feb 6, 2018 at 3:45 PM, Fabian Hueske <fhue...@apache.org> wrote: > >> Hi Henri, >> >> thanks for reaching out and providing code and data to reproduce the >> issue. >> >> I think you are right, a "SELECT DISTINCT a, b, c FROM X" should not >> result in a retraction stream. >> >> However, with the current implementation we internally need a retraction >> stream if a state retention time is configured. >> The reason lies in how state retention time is defined: the state >> retention time will remove the state for a key if it hasn't been seen for x >> time. >> This means that an operator resets a state clean-up timer of a key >> whenever a new record with that key is received. This is also true for >> retraction / insertion messages of the same record. >> If we implement the GroupBy that performs the DISTINCT as an operator >> that emits an append stream, all downstream operator won't see any updates >> because the GroupBy only emits the first and filters out all duplicates. >> Hence, downstream operators would perform a clean-up too early. >> >> I see that these are internals that users should not need to worry about, >> but right now there is no easy solution to this. >> Eventually, the clean-up timer reset should be differently implemented >> than using retraction and insert of the same record. However, this would be >> a more involved change and requires good planning. >> >> I'll file a JIRA for that. >> >> Thanks again for bringing the issue to our attention. >> >> Best, Fabian >> >> >> 2018-02-06 13:59 GMT+01:00 Timo Walther <twal...@apache.org>: >> >>> Hi Henri, >>> >>> I just noticed that I had a tiny mistake in my little test program. So >>> SELECT DISTINCT is officially supported. But the question if this is a >>> valid append stream is still up for discussion. I will loop in Fabian (in >>> CC). >>> >>> For the general behavior you can also look into the code and especially >>> the comments there [1]. >>> >>> Regards, >>> Timo >>> >>> [1] https://github.com/apache/flink/blob/master/flink-libraries/ >>> flink-table/src/main/scala/org/apache/flink/table/runtime/ >>> aggregate/GroupAggProcessFunction.scala >>> >>> >>> Am 2/6/18 um 1:36 PM schrieb Timo Walther: >>> >>> Hi Henri, >>> >>> I try to answer your question: >>> >>> 1) You are right, SELECT DISTINCT should not need a retract stream. >>> Internally, this is translated into an aggregation without an aggregate >>> function call. So this definitely needs improvement. >>> >>> 2) The problem is that SELECT DISTINCT is not officially supported nor >>> tested. I opened an issue for this [1]. >>> >>> Until this issue is fixed I would recommend to implement a custom >>> aggregate function that keeps track values seen so far [2]. >>> >>> Regards, >>> Timo >>> >>> [1] https://issues.apache.org/jira/browse/FLINK-8564 >>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/ >>> dev/table/udfs.html#aggregation-functions >>> >>> >>> Am 2/6/18 um 11:11 AM schrieb Henri Heiskanen: >>> >>> Hi, >>> >>> I have a use case where I would like to find distinct rows over certain >>> period of time. Requirement is that new row is emitted asap. Otherwise the >>> requirement is mainly to just filter out data to have smaller dataset for >>> downstream. I noticed that SELECT DISTINCT and state retention time of 12 >>> hours would in theory do the trick. You can find the code below. Few >>> questions. >>> >>> 1) Why is SELECT DISTINCT creating a retract stream? In which scenarios >>> we would get update/delete rows? >>> >>> 2) If I run the below code with the example data (also below) without >>> state retention config I get the two append rows (expected). If I run >>> exactly the code below (with the retention config) I'll get two appends and >>> one delete for AN1234 and then one append for AN5555. What is going on? >>> >>> StreamExecutionEnvironment env = StreamExecutionEnvironment.get >>> ExecutionEnvironment(); >>> >>> StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvir >>> onment(env); >>> >>> StreamQueryConfig qConfig = tableEnv.queryConfig(); >>> // set idle state retention time. min = max = 12 hours >>> qConfig.withIdleStateRetentionTime(Time.hours(12)); >>> >>> // create a TableSource >>> CsvTableSource csvSource = CsvTableSource >>> .builder() >>> .path("data.csv") >>> .field("ts", Types.SQL_TIMESTAMP()) >>> .field("aid1", Types.STRING()) >>> .field("aid2", Types.STRING()) >>> .field("advertiser_id", Types.STRING()) >>> .field("platform_id", Types.STRING()) >>> .fieldDelimiter(",") >>> .build(); >>> >>> tableEnv.registerTableSource("CsvTable", csvSource); >>> >>> Table result = tableEnv.sqlQuery( >>> "SELECT DISTINCT aid1, aid2, advertiser_id, platform_id FROM CsvTable"); >>> >>> StdOutRetractStreamTableSink out = new StdOutRetractStreamTableSink(new >>> String[] {"aid1", "aid2", "advertiser_id", "platform_id"}, >>> new TypeInformation[] {Types.STRING(), Types.STRING(), Types.STRING(), >>> Types.STRING()}); >>> >>> result.writeToSink(out, qConfig); >>> >>> env.execute(); >>> >>> >>> Here is a simple csv dataset of three rows: >>> >>> 2018-01-31 12:00:00,AN1234,RC1234,0000-0000-0000-00000,1234-1234-1234-1 >>> 234,1234567890 >>> 2018-01-31 12:00:02,AN1234,RC1234,0000-0000-0000-00000,1234-1234-1234-1 >>> 234,1234567890 >>> 2018-01-31 12:00:02,AN5555,RC5555,0000-0000-0000-00001,1234-1234-1234-1 >>> 234,1234567891 >>> >>> >>> >>> >> >