[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs
[ https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17776082#comment-17776082 ] tanjialiang commented on FLINK-29692: - Hi everyone, I notice that there is no solution yet. I want to share my thoughts about this feature. Maybe it can help. I think support early-fire may not be the best solution to the current window function. Because every window triggers are expensive, and also the early-fire is not the realtime trigger. For example {code:sql} SET table.exec.emit.early-fire.enabled = true; SET table.exec.emit.early-fire.delay = 1min; SELECT user_id, COUNT(*) AS total, HOP_START(rowtime, INTERVAL '24' HOUR, INTERVAL '48' HOUR) AS window_start, HOP_END(rowtime, rowtime, INTERVAL '24' HOUR, INTERVAL '48' HOUR) AS window_end, FROM user_click GROUP BY user_id, HOP(rowtime, INTERVAL '24' HOUR, INTERVAL '48' HOUR); {code} 1. whether HOP/TUMBLE/CUMULATE window or enable early-fire, there are having a time delay, which are not realtime enough. 2. when the cardinal of user_id is large, everytime to trigger window is very expensive, which would make job instability, easy to make checkpoint timeout. 3. everytime early-fire would trigger all user_id's windows, but maybe only a small part of the data actually changed in this early-fire trigger interval, which maybe cause write pressure to the sink. In my company, I've added a window TVF function for this case, named HOPv2/TUMBLEv2 (maybe the name is not fit for the community). {code:sql} select user_id, COUNT(*) AS total, window_start, window_time, -- the record rowtime window_end FROM TABLE( HOPV2( DATA => TABLE user_click, TIMECOL => DESCRIPTOR(rowtime), SLIDE => INTERVAL '24' HOUR SIZE => INTERVAL '48' HOUR, ALLOWED_LATENESS => true)) GROUP BY user_id, window_start, window_time, window_end; {code} 1. similar to OVER window,we accumulate and output the result when record comming (actually is on timer trigger), which is in realtime trigger and also there is not a lot of write pressure for sink. 2. the window_time is the record rowtime, which is represents the current progress. 3. similar to HOP window, we fire and purge when window_end come. 4. support allowedLateness option, when process the late event, if its window have not been purge, allow acuumulate without emit. I would like to contribute it but maybe need more discussion and help because i am still a novice for flink contribution. > Support early/late fires for Windowing TVFs > --- > > Key: FLINK-29692 > URL: https://issues.apache.org/jira/browse/FLINK-29692 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Affects Versions: 1.15.3 >Reporter: Canope Nerda >Priority: Major > > I have cases where I need to 1) output data as soon as possible and 2) handle > late arriving data to achieve eventual correctness. In the logic, I need to > do window deduplication which is based on Windowing TVFs and according to > source code, early/late fires are not supported yet in Windowing TVFs. > Actually 1) contradicts with 2). Without early/late fires, we had to > compromise, either live with fresh incorrect data or tolerate excess latency > for correctness. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs
[ https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17718710#comment-17718710 ] Charles Tan commented on FLINK-29692: - [~jark] your point makes sense. However, it can be cumbersome to come up with workarounds for every use case that would benefit from an early fire feature – imagine if you have 100 ksqlDB workloads that you are trying to migrate to Flink. While many use cases can be supported through Flink's existing features, it would still be nice for Flink to support early fire windows with window TVF. Out of curiosity, what are the difficult parts of supporting early fire on window TVF? From what I understand, Flink already supports window triggers in the DataStream API and the old windowing functions already supported this configuration. I am not very familiar with the implementation details of SQL operators in Flink, but if there was a list of tasks relevant to supporting this feature maybe I could lend a hand in taking a few of those items on. > Support early/late fires for Windowing TVFs > --- > > Key: FLINK-29692 > URL: https://issues.apache.org/jira/browse/FLINK-29692 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Affects Versions: 1.15.3 >Reporter: Canope Nerda >Priority: Major > > I have cases where I need to 1) output data as soon as possible and 2) handle > late arriving data to achieve eventual correctness. In the logic, I need to > do window deduplication which is based on Windowing TVFs and according to > source code, early/late fires are not supported yet in Windowing TVFs. > Actually 1) contradicts with 2). Without early/late fires, we had to > compromise, either live with fresh incorrect data or tolerate excess latency > for correctness. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs
[ https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715607#comment-17715607 ] Jark Wu commented on FLINK-29692: - I think early fire / late arrival is a great feature, but I just want to explore if we have other or better solutions for the use case because supporting early fire / late arrival on the new window TVF might not be easy. > Support early/late fires for Windowing TVFs > --- > > Key: FLINK-29692 > URL: https://issues.apache.org/jira/browse/FLINK-29692 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Affects Versions: 1.15.3 >Reporter: Canope Nerda >Priority: Major > > I have cases where I need to 1) output data as soon as possible and 2) handle > late arriving data to achieve eventual correctness. In the logic, I need to > do window deduplication which is based on Windowing TVFs and according to > source code, early/late fires are not supported yet in Windowing TVFs. > Actually 1) contradicts with 2). Without early/late fires, we had to > compromise, either live with fresh incorrect data or tolerate excess latency > for correctness. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs
[ https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715604#comment-17715604 ] Jark Wu commented on FLINK-29692: - Hi [~charles-tan], yes, they have some differences. Group Aggregate doesn't support expire state like the window does, but you can enable state TTL to expire states. Regarding the hopping windows, you can implement a UDTF to split a record into multiple records associated with different windows and apply group aggregate on the windows. > Support early/late fires for Windowing TVFs > --- > > Key: FLINK-29692 > URL: https://issues.apache.org/jira/browse/FLINK-29692 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Affects Versions: 1.15.3 >Reporter: Canope Nerda >Priority: Major > > I have cases where I need to 1) output data as soon as possible and 2) handle > late arriving data to achieve eventual correctness. In the logic, I need to > do window deduplication which is based on Windowing TVFs and according to > source code, early/late fires are not supported yet in Windowing TVFs. > Actually 1) contradicts with 2). Without early/late fires, we had to > compromise, either live with fresh incorrect data or tolerate excess latency > for correctness. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs
[ https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715100#comment-17715100 ] Chalres Tan commented on FLINK-29692: - re [~jark]: Thanks for the response. The example query you provided would yield the correct results, but from my understanding with window TVF, after the window expires then the state associated with that window will be cleared. With the example you provided, is it true that the state will just accumulate over time? Also, the approach with the query won't work by adjusting the use case slightly. For example, if instead of 1 hour tumbling windows they were 2 hour windows or if instead of using tumbling windows we had hopping windows with length 1 hour every 30 minutes. > Support early/late fires for Windowing TVFs > --- > > Key: FLINK-29692 > URL: https://issues.apache.org/jira/browse/FLINK-29692 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Affects Versions: 1.15.3 >Reporter: Canope Nerda >Priority: Major > > I have cases where I need to 1) output data as soon as possible and 2) handle > late arriving data to achieve eventual correctness. In the logic, I need to > do window deduplication which is based on Windowing TVFs and according to > source code, early/late fires are not supported yet in Windowing TVFs. > Actually 1) contradicts with 2). Without early/late fires, we had to > compromise, either live with fresh incorrect data or tolerate excess latency > for correctness. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs
[ https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17714985#comment-17714985 ] Jark Wu commented on FLINK-29692: - Hi [~charles-tan], thank you for sharing your use case. I'm just curious that is it possible to support your use case by using Group Aggregate instead of Window Aggregate? For example: {code} SELECT user, COUNT(*) as cnt FROM withdrawal GROUP BY user, DATE_FORMAT(withdrawal_timestamp, "-MM-dd HH:00") -- trim into hour HAVING cnt >= 3 {code} IIUC, this can also archive that "notified if a withdrawal from a bank account happens 3 times in an hour" ASAP. And you may get better performance from the tuning[1]. [1]: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/ > Support early/late fires for Windowing TVFs > --- > > Key: FLINK-29692 > URL: https://issues.apache.org/jira/browse/FLINK-29692 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Affects Versions: 1.15.3 >Reporter: Canope Nerda >Priority: Major > > I have cases where I need to 1) output data as soon as possible and 2) handle > late arriving data to achieve eventual correctness. In the logic, I need to > do window deduplication which is based on Windowing TVFs and according to > source code, early/late fires are not supported yet in Windowing TVFs. > Actually 1) contradicts with 2). Without early/late fires, we had to > compromise, either live with fresh incorrect data or tolerate excess latency > for correctness. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs
[ https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17714815#comment-17714815 ] Aitozi commented on FLINK-29692: hi, sorry for jumping into this discussion. I want to share two thoughts about this feature. * As [~jark] mentioned, in the group window aggregation the early/late fire has been supported. Window TVF as a feature-rich version of group window aggregation, so I think this ability should be aligned. * There is difference between the early/late fire and cumulate window tvf. early/late fire is something relates to the window trigger. But cumulative window relates to window assigner. We could use cumulative window to simulate the same functionality, but it may bring overhead as [~charles-tan] said. So, +1 from my side to support emit strategy in window TVF. > Support early/late fires for Windowing TVFs > --- > > Key: FLINK-29692 > URL: https://issues.apache.org/jira/browse/FLINK-29692 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Affects Versions: 1.15.3 >Reporter: Canope Nerda >Priority: Major > > I have cases where I need to 1) output data as soon as possible and 2) handle > late arriving data to achieve eventual correctness. In the logic, I need to > do window deduplication which is based on Windowing TVFs and according to > source code, early/late fires are not supported yet in Windowing TVFs. > Actually 1) contradicts with 2). Without early/late fires, we had to > compromise, either live with fresh incorrect data or tolerate excess latency > for correctness. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs
[ https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17713726#comment-17713726 ] Chalres Tan commented on FLINK-29692: - hi [~martijnvisser], there are many use cases that I believe can benefit from a feature like this. One use case example would be fraud detection. Let's say you want to be notified if a withdrawal from a bank account happens 3 times in an hour. With a TUMBLING window of 1 hour, it isn't acceptable that you should need to wait an hour for the window to end before realizing that a potential fraud has happened. Of course CUMULATE window can reduce this delay, but like I mentioned above we would be opening many more windows depending on how often we want to emit an intermediate result which can be resource intensive. I think supporting an early fire configuration, as was supported with older windowing functions, for the newer TVFs would be a more streamlined way of supporting use cases like these. A secondary point is that windows in ksqldb have early fire by design (they struggle in the opposite way here in that they don't easily support emitting one correct result per window). Supporting an early fire configuration makes it much easier for users who are trying to migrate their use cases from ksqldb to Flink. > Support early/late fires for Windowing TVFs > --- > > Key: FLINK-29692 > URL: https://issues.apache.org/jira/browse/FLINK-29692 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Affects Versions: 1.15.3 >Reporter: Canope Nerda >Priority: Major > > I have cases where I need to 1) output data as soon as possible and 2) handle > late arriving data to achieve eventual correctness. In the logic, I need to > do window deduplication which is based on Windowing TVFs and according to > source code, early/late fires are not supported yet in Windowing TVFs. > Actually 1) contradicts with 2). Without early/late fires, we had to > compromise, either live with fresh incorrect data or tolerate excess latency > for correctness. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs
[ https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17712941#comment-17712941 ] Martijn Visser commented on FLINK-29692: [~charles-tan] Could you also elaborate on your use case for this? > Support early/late fires for Windowing TVFs > --- > > Key: FLINK-29692 > URL: https://issues.apache.org/jira/browse/FLINK-29692 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Affects Versions: 1.15.3 >Reporter: Canope Nerda >Priority: Major > > I have cases where I need to 1) output data as soon as possible and 2) handle > late arriving data to achieve eventual correctness. In the logic, I need to > do window deduplication which is based on Windowing TVFs and according to > source code, early/late fires are not supported yet in Windowing TVFs. > Actually 1) contradicts with 2). Without early/late fires, we had to > compromise, either live with fresh incorrect data or tolerate excess latency > for correctness. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs
[ https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711990#comment-17711990 ] Martijn Visser commented on FLINK-29692: [~charles-tan] I don’t think there’s consensus yet how this would/should work properly > Support early/late fires for Windowing TVFs > --- > > Key: FLINK-29692 > URL: https://issues.apache.org/jira/browse/FLINK-29692 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Affects Versions: 1.15.3 >Reporter: Canope Nerda >Priority: Major > > I have cases where I need to 1) output data as soon as possible and 2) handle > late arriving data to achieve eventual correctness. In the logic, I need to > do window deduplication which is based on Windowing TVFs and according to > source code, early/late fires are not supported yet in Windowing TVFs. > Actually 1) contradicts with 2). Without early/late fires, we had to > compromise, either live with fresh incorrect data or tolerate excess latency > for correctness. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs
[ https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711958#comment-17711958 ] Chalres Tan commented on FLINK-29692: - Hi all, I notice this thread has gotten a little bit stale and was hoping to continue this conversation. I also have a use case that would benefit from the `emit.early-fire.enabled` configuration working with window TVF. Using cumulate window helps our use case, but cumulate window can be quite expensive if you have a small step size. Are there any plans for this issue? > Support early/late fires for Windowing TVFs > --- > > Key: FLINK-29692 > URL: https://issues.apache.org/jira/browse/FLINK-29692 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Affects Versions: 1.15.3 >Reporter: Canope Nerda >Priority: Major > > I have cases where I need to 1) output data as soon as possible and 2) handle > late arriving data to achieve eventual correctness. In the logic, I need to > do window deduplication which is based on Windowing TVFs and according to > source code, early/late fires are not supported yet in Windowing TVFs. > Actually 1) contradicts with 2). Without early/late fires, we had to > compromise, either live with fresh incorrect data or tolerate excess latency > for correctness. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs
[ https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17623595#comment-17623595 ] Jakub Partyka commented on FLINK-29692: --- {quote}Could you describe the demand in detail [~canopenerda] [~jjpar] , thanks a lot. Let's see if there are other better plans which could satisfy your demands, such as [cumulate window tvf?|https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/sql/queries/window-tvf/#cumulate] {quote} In my case I have a source, that has event time and watermark time in different columns. Watermark column guarantees, that all events with event time <= watermark time had arrived. But this watermark has greater delay, and we are working with minute-sized windows. Therefore we use early fire to emit intermediate results, without waiting for watermark. > Support early/late fires for Windowing TVFs > --- > > Key: FLINK-29692 > URL: https://issues.apache.org/jira/browse/FLINK-29692 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Affects Versions: 1.15.2 >Reporter: Canope Nerda >Priority: Major > > I have cases where I need to 1) output data as soon as possible and 2) handle > late arriving data to achieve eventual correctness. In the logic, I need to > do window deduplication which is based on Windowing TVFs and according to > source code, early/late fires are not supported yet in Windowing TVFs. > Actually 1) contradicts with 2). Without early/late fires, we had to > compromise, either live with fresh incorrect data or tolerate excess latency > for correctness. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs
[ https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17623481#comment-17623481 ] Benchao Li commented on FLINK-29692: {quote}'Early fire' would periodically sent intermediate result to downstream, the frequency to send intermediate result is based on processing time, even thought the window itself based on event time. It mixed up processing time semantics with the time attribute of Window. {quote} I agree, it's not perfect. {quote}If you need to retire window state and send intermediate result, how about trying "[cumulate window tvf|https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/sql/queries/window-tvf/#cumulate]; instead of 'early fire'? {quote} Cumulate window could cover most cases, except that it's semantic is different from others (unbounded aggregate and window with early-fire's output semantic is changelog for the same grouping key). > Support early/late fires for Windowing TVFs > --- > > Key: FLINK-29692 > URL: https://issues.apache.org/jira/browse/FLINK-29692 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Affects Versions: 1.15.2 >Reporter: Canope Nerda >Priority: Major > > I have cases where I need to 1) output data as soon as possible and 2) handle > late arriving data to achieve eventual correctness. In the logic, I need to > do window deduplication which is based on Windowing TVFs and according to > source code, early/late fires are not supported yet in Windowing TVFs. > Actually 1) contradicts with 2). Without early/late fires, we had to > compromise, either live with fresh incorrect data or tolerate excess latency > for correctness. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs
[ https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17623154#comment-17623154 ] Jing Zhang commented on FLINK-29692: [~canopenerda] Thanks for sharing. The demand is a [deduplication|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/deduplication] on event time, get first row. But now time attribute is on millisecond unit instead of microsecond. I guess it's also the reason why you choose early-fire window. How about use [topN|https://issues.apache.org/jira/browse/FLINK-23107]? top1 almost has well performance as good as [deduplication|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/deduplication] because of [FLINK-23107|https://issues.apache.org/jira/browse/FLINK-23107]. > Support early/late fires for Windowing TVFs > --- > > Key: FLINK-29692 > URL: https://issues.apache.org/jira/browse/FLINK-29692 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Affects Versions: 1.15.2 >Reporter: Canope Nerda >Priority: Major > > I have cases where I need to 1) output data as soon as possible and 2) handle > late arriving data to achieve eventual correctness. In the logic, I need to > do window deduplication which is based on Windowing TVFs and according to > source code, early/late fires are not supported yet in Windowing TVFs. > Actually 1) contradicts with 2). Without early/late fires, we had to > compromise, either live with fresh incorrect data or tolerate excess latency > for correctness. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs
[ https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17623143#comment-17623143 ] Jing Zhang commented on FLINK-29692: [~libenchao] Thanks for sharing. 'Early fire' would periodically sent intermediate result to downstream, the frequency to send intermediate result is based on processing time, even thought the window itself based on event time. It mixed up processing time semantics with the time attribute of Window. If you need to retire window state and send intermediate result, how about trying "[cumulate window tvf|https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/sql/queries/window-tvf/#cumulate]; instead of 'early fire'? > Support early/late fires for Windowing TVFs > --- > > Key: FLINK-29692 > URL: https://issues.apache.org/jira/browse/FLINK-29692 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Affects Versions: 1.15.2 >Reporter: Canope Nerda >Priority: Major > > I have cases where I need to 1) output data as soon as possible and 2) handle > late arriving data to achieve eventual correctness. In the logic, I need to > do window deduplication which is based on Windowing TVFs and according to > source code, early/late fires are not supported yet in Windowing TVFs. > Actually 1) contradicts with 2). Without early/late fires, we had to > compromise, either live with fresh incorrect data or tolerate excess latency > for correctness. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs
[ https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17623052#comment-17623052 ] Canope Nerda commented on FLINK-29692: -- [~jingzhang] Let me elaborate my case a little bit. First of all, it's a case in high frequency trading. We deployed multiple agents globally to individually collect data from exchanges, so inherently there are duplicates and each data entry has a field that uniquely identifies the data from biz perspective, and an event time column in microsecond. The event time column may vary for entries with the same unique column value. So we would like to first the entry with the earliest event time for each unique column value. > Support early/late fires for Windowing TVFs > --- > > Key: FLINK-29692 > URL: https://issues.apache.org/jira/browse/FLINK-29692 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Affects Versions: 1.15.2 >Reporter: Canope Nerda >Priority: Major > > I have cases where I need to 1) output data as soon as possible and 2) handle > late arriving data to achieve eventual correctness. In the logic, I need to > do window deduplication which is based on Windowing TVFs and according to > source code, early/late fires are not supported yet in Windowing TVFs. > Actually 1) contradicts with 2). Without early/late fires, we had to > compromise, either live with fresh incorrect data or tolerate excess latency > for correctness. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs
[ https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17622950#comment-17622950 ] Benchao Li commented on FLINK-29692: Regarding "early-fire with window" versus "unbounded aggregate", I could share a point. Usually they could do the same thing, but when come to the state retiring, they will be a little different. # window could retire state using watermark, and drop late event # for unbounded aggregate, we could only set a state ttl, and there is no way to drop late event This is the most important reason why some use cases still prefer to use "early-fire with window". > Support early/late fires for Windowing TVFs > --- > > Key: FLINK-29692 > URL: https://issues.apache.org/jira/browse/FLINK-29692 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Affects Versions: 1.15.2 >Reporter: Canope Nerda >Priority: Major > > I have cases where I need to 1) output data as soon as possible and 2) handle > late arriving data to achieve eventual correctness. In the logic, I need to > do window deduplication which is based on Windowing TVFs and according to > source code, early/late fires are not supported yet in Windowing TVFs. > Actually 1) contradicts with 2). Without early/late fires, we had to > compromise, either live with fresh incorrect data or tolerate excess latency > for correctness. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs
[ https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17622940#comment-17622940 ] Jing Zhang commented on FLINK-29692: Sorry for late response. I'm still wondering whether we really need early fire/late fire. As [~jark] said, eary fire/late fires is supported in group window aggregation. when users enable early fire/ late fires of window aggregate, he/she would allow: # the emitted result would has no restrict mapping with time # the intermediate result could be allowed to emitted # the window result might be retracted later If a user could allows the above conditions, why not use unbounded aggregate directly? Otherwise, window aggregate would be the best options. Could you describe the demand in detail [~canopenerda] [~jjpar] , thanks a lot. Maybe there are others plans which could satisfy your demands. such as [cumulate window tvf|https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/sql/queries/window-tvf/#cumulate] > Support early/late fires for Windowing TVFs > --- > > Key: FLINK-29692 > URL: https://issues.apache.org/jira/browse/FLINK-29692 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Affects Versions: 1.15.2 >Reporter: Canope Nerda >Priority: Major > > I have cases where I need to 1) output data as soon as possible and 2) handle > late arriving data to achieve eventual correctness. In the logic, I need to > do window deduplication which is based on Windowing TVFs and according to > source code, early/late fires are not supported yet in Windowing TVFs. > Actually 1) contradicts with 2). Without early/late fires, we had to > compromise, either live with fresh incorrect data or tolerate excess latency > for correctness. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs
[ https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1764#comment-1764 ] Jakub Partyka commented on FLINK-29692: --- It would be really nice if Windowing TVF's supported early/late fires. For us early fire is a must, so we are forced to use legacy group window agg, which lack support for several optimizations: Split Distinct Aggregation and Local-Global Aggregation. > Support early/late fires for Windowing TVFs > --- > > Key: FLINK-29692 > URL: https://issues.apache.org/jira/browse/FLINK-29692 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Affects Versions: 1.15.2 >Reporter: Canope Nerda >Priority: Major > > I have cases where I need to 1) output data as soon as possible and 2) handle > late arriving data to achieve eventual correctness. In the logic, I need to > do window deduplication which is based on Windowing TVFs and according to > source code, early/late fires are not supported yet in Windowing TVFs. > Actually 1) contradicts with 2). Without early/late fires, we had to > compromise, either live with fresh incorrect data or tolerate excess latency > for correctness. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs
[ https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17621435#comment-17621435 ] Canope Nerda commented on FLINK-29692: -- Thanks [~jark] for suggesting the legacy group window agg. So last value in your example is based on rowtime or proctime? Another problem in my case is that we require time precision to be microsecond, but time attribute only supports millisecond so far. > Support early/late fires for Windowing TVFs > --- > > Key: FLINK-29692 > URL: https://issues.apache.org/jira/browse/FLINK-29692 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Affects Versions: 1.15.2 >Reporter: Canope Nerda >Priority: Major > > I have cases where I need to 1) output data as soon as possible and 2) handle > late arriving data to achieve eventual correctness. In the logic, I need to > do window deduplication which is based on Windowing TVFs and according to > source code, early/late fires are not supported yet in Windowing TVFs. > Actually 1) contradicts with 2). Without early/late fires, we had to > compromise, either live with fresh incorrect data or tolerate excess latency > for correctness. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs
[ https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17621167#comment-17621167 ] Jark Wu commented on FLINK-29692: - I'm fine with supporting this feature. But we have supported early/late fires for the legacy group window aggregation. You can use {{last_value(col)}} agg function to get window deduplication result. For example: {code:sql} -- output window result for every 5 seconds before window fire SET table.exec.emit.early-fire.enabled = true; SET table.exec.emit.early-fire.delay = 5s; -- emit window result for every late arriving record SET table.exec.emit.late-fire.enabled = true; SET table.exec.emit.late-fire.delay = 0; SELECT TUMBLE_START(rowtime, INTERVAL '5' MINUTES), last_value(col_a), last_value(col_b) FROM my_source_table GROUP BY mykey, TUMBLE(rowtime, INTERVAL '5' MINUTES); {code} > Support early/late fires for Windowing TVFs > --- > > Key: FLINK-29692 > URL: https://issues.apache.org/jira/browse/FLINK-29692 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Affects Versions: 1.15.2 >Reporter: Canope Nerda >Priority: Major > > I have cases where I need to 1) output data as soon as possible and 2) handle > late arriving data to achieve eventual correctness. In the logic, I need to > do window deduplication which is based on Windowing TVFs and according to > source code, early/late fires are not supported yet in Windowing TVFs. > Actually 1) contradicts with 2). Without early/late fires, we had to > compromise, either live with fresh incorrect data or tolerate excess latency > for correctness. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs
[ https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17620957#comment-17620957 ] Martijn Visser commented on FLINK-29692: [~godfrey] [~jark] [~jingzhang] WDYT? > Support early/late fires for Windowing TVFs > --- > > Key: FLINK-29692 > URL: https://issues.apache.org/jira/browse/FLINK-29692 > Project: Flink > Issue Type: New Feature > Components: Table SQL / Planner >Affects Versions: 1.15.2 >Reporter: Canope Nerda >Priority: Major > > I have cases where I need to 1) output data as soon as possible and 2) handle > late arriving data to achieve eventual correctness. In the logic, I need to > do window deduplication which is based on Windowing TVFs and according to > source code, early/late fires are not supported yet in Windowing TVFs. > Actually 1) contradicts with 2). Without early/late fires, we had to > compromise, either live with fresh incorrect data or tolerate excess latency > for correctness. -- This message was sent by Atlassian Jira (v8.20.10#820010)