[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs

2023-10-17 Thread tanjialiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17776082#comment-17776082
 ] 

tanjialiang commented on FLINK-29692:
-

Hi everyone, I notice that there is no solution yet. I want to share my 
thoughts about this feature. Maybe it can help.
 
I think support early-fire may not be the best solution to the current window 
function. Because every window triggers are expensive, and also the early-fire 
is not the realtime trigger.

For example
{code:sql}
SET table.exec.emit.early-fire.enabled = true;
SET table.exec.emit.early-fire.delay = 1min;

SELECT user_id,   
   COUNT(*) AS total,   
   HOP_START(rowtime, INTERVAL '24' HOUR, INTERVAL '48' HOUR) AS 
window_start,
   HOP_END(rowtime, rowtime, INTERVAL '24' HOUR, INTERVAL '48' HOUR) AS 
window_end,
FROM user_click
GROUP BY user_id, HOP(rowtime, INTERVAL '24' HOUR, INTERVAL '48' HOUR); {code}
1. whether HOP/TUMBLE/CUMULATE window or enable early-fire, there are having a 
time delay, which are not realtime enough.

2. when the cardinal of user_id is large, everytime to trigger window is very 
expensive, which would make job instability, easy to make checkpoint timeout.

3. everytime early-fire would trigger all user_id's windows, but maybe only a 
small part of the data actually changed in this early-fire trigger interval, 
which maybe cause write pressure to the sink.

 
In my company, I've added a window TVF function for this case, named 
HOPv2/TUMBLEv2 (maybe the name is not fit for the community).
{code:sql}
select user_id,   
   COUNT(*) AS total, 
   window_start,
   window_time, -- the record rowtime
   window_end
FROM TABLE(
HOPV2(
DATA => TABLE user_click,
TIMECOL => DESCRIPTOR(rowtime),
SLIDE => INTERVAL '24' HOUR
SIZE => INTERVAL '48' HOUR,
ALLOWED_LATENESS => true))
GROUP BY user_id, window_start, window_time, window_end; {code}
1. similar to OVER window,we accumulate and output the result when record 
comming (actually is on timer trigger), which is in realtime trigger and also 
there is not a lot of write pressure for sink.
2. the window_time is the record rowtime, which is represents the current 
progress.
3. similar to HOP window, we fire and purge when window_end come.
4. support allowedLateness option, when process the late event, if its window 
have not been purge, allow acuumulate without emit.

 
I would like to contribute it but maybe need more discussion and help because i 
am still a novice for flink contribution.
 

> Support early/late fires for Windowing TVFs
> ---
>
> Key: FLINK-29692
> URL: https://issues.apache.org/jira/browse/FLINK-29692
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Affects Versions: 1.15.3
>Reporter: Canope Nerda
>Priority: Major
>
> I have cases where I need to 1) output data as soon as possible and 2) handle 
> late arriving data to achieve eventual correctness. In the logic, I need to 
> do window deduplication which is based on Windowing TVFs and according to 
> source code, early/late fires are not supported yet in Windowing TVFs.
> Actually 1) contradicts with 2). Without early/late fires, we had to 
> compromise, either live with fresh incorrect data or tolerate excess latency 
> for correctness.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs

2023-05-02 Thread Charles Tan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17718710#comment-17718710
 ] 

Charles Tan commented on FLINK-29692:
-

[~jark] your point makes sense. However, it can be cumbersome to come up with 
workarounds for every use case that would benefit from an early fire feature – 
imagine if you have 100 ksqlDB workloads that you are trying to migrate to 
Flink. While many use cases can be supported through Flink's existing features, 
it would still be nice for Flink to support early fire windows with window TVF.

Out of curiosity, what are the difficult parts of supporting early fire on 
window TVF? From what I understand, Flink already supports window triggers in 
the DataStream API and the old windowing functions already supported this 
configuration. I am not very familiar with the implementation details of SQL 
operators in Flink, but if there was a list of tasks relevant to supporting 
this feature maybe I could lend a hand in taking a few of those items on.

> Support early/late fires for Windowing TVFs
> ---
>
> Key: FLINK-29692
> URL: https://issues.apache.org/jira/browse/FLINK-29692
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Affects Versions: 1.15.3
>Reporter: Canope Nerda
>Priority: Major
>
> I have cases where I need to 1) output data as soon as possible and 2) handle 
> late arriving data to achieve eventual correctness. In the logic, I need to 
> do window deduplication which is based on Windowing TVFs and according to 
> source code, early/late fires are not supported yet in Windowing TVFs.
> Actually 1) contradicts with 2). Without early/late fires, we had to 
> compromise, either live with fresh incorrect data or tolerate excess latency 
> for correctness.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs

2023-04-24 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715607#comment-17715607
 ] 

Jark Wu commented on FLINK-29692:
-

I think early fire / late arrival is a great feature, but I just want to 
explore if we have other or better solutions for the use case because 
supporting early fire / late arrival on the new window TVF might not be easy. 

> Support early/late fires for Windowing TVFs
> ---
>
> Key: FLINK-29692
> URL: https://issues.apache.org/jira/browse/FLINK-29692
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Affects Versions: 1.15.3
>Reporter: Canope Nerda
>Priority: Major
>
> I have cases where I need to 1) output data as soon as possible and 2) handle 
> late arriving data to achieve eventual correctness. In the logic, I need to 
> do window deduplication which is based on Windowing TVFs and according to 
> source code, early/late fires are not supported yet in Windowing TVFs.
> Actually 1) contradicts with 2). Without early/late fires, we had to 
> compromise, either live with fresh incorrect data or tolerate excess latency 
> for correctness.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs

2023-04-24 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715604#comment-17715604
 ] 

Jark Wu commented on FLINK-29692:
-

Hi [~charles-tan], yes, they have some differences. Group Aggregate doesn't 
support expire state like the window does, but you can enable state TTL to 
expire states. Regarding the hopping windows, you can implement a UDTF to split 
a record into multiple records associated with different windows and apply 
group aggregate on the windows. 

> Support early/late fires for Windowing TVFs
> ---
>
> Key: FLINK-29692
> URL: https://issues.apache.org/jira/browse/FLINK-29692
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Affects Versions: 1.15.3
>Reporter: Canope Nerda
>Priority: Major
>
> I have cases where I need to 1) output data as soon as possible and 2) handle 
> late arriving data to achieve eventual correctness. In the logic, I need to 
> do window deduplication which is based on Windowing TVFs and according to 
> source code, early/late fires are not supported yet in Windowing TVFs.
> Actually 1) contradicts with 2). Without early/late fires, we had to 
> compromise, either live with fresh incorrect data or tolerate excess latency 
> for correctness.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs

2023-04-21 Thread Chalres Tan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17715100#comment-17715100
 ] 

Chalres Tan commented on FLINK-29692:
-

re [~jark]: Thanks for the response. The example query you provided would yield 
the correct results, but from my understanding with window TVF, after the 
window expires then the state associated with that window will be cleared. With 
the example you provided, is it true that the state will just accumulate over 
time? Also, the approach with the query won't work by adjusting the use case 
slightly. For example, if instead of 1 hour tumbling windows they were 2 hour 
windows or if instead of using tumbling windows we had hopping windows with 
length 1 hour every 30 minutes.

> Support early/late fires for Windowing TVFs
> ---
>
> Key: FLINK-29692
> URL: https://issues.apache.org/jira/browse/FLINK-29692
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Affects Versions: 1.15.3
>Reporter: Canope Nerda
>Priority: Major
>
> I have cases where I need to 1) output data as soon as possible and 2) handle 
> late arriving data to achieve eventual correctness. In the logic, I need to 
> do window deduplication which is based on Windowing TVFs and according to 
> source code, early/late fires are not supported yet in Windowing TVFs.
> Actually 1) contradicts with 2). Without early/late fires, we had to 
> compromise, either live with fresh incorrect data or tolerate excess latency 
> for correctness.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs

2023-04-21 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17714985#comment-17714985
 ] 

Jark Wu commented on FLINK-29692:
-

Hi [~charles-tan], thank you for sharing your use case. I'm just curious that 
is it possible to support your use case by using Group Aggregate instead of 
Window Aggregate? For example: 

{code}
SELECT user, COUNT(*) as cnt
FROM withdrawal
GROUP BY 
   user, 
   DATE_FORMAT(withdrawal_timestamp, "-MM-dd HH:00") -- trim into hour
HAVING cnt >= 3
{code}

IIUC, this can also archive that "notified if a withdrawal from a bank account 
happens 3 times in an hour" ASAP. And you may get better performance from the 
tuning[1]. 


[1]: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/

> Support early/late fires for Windowing TVFs
> ---
>
> Key: FLINK-29692
> URL: https://issues.apache.org/jira/browse/FLINK-29692
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Affects Versions: 1.15.3
>Reporter: Canope Nerda
>Priority: Major
>
> I have cases where I need to 1) output data as soon as possible and 2) handle 
> late arriving data to achieve eventual correctness. In the logic, I need to 
> do window deduplication which is based on Windowing TVFs and according to 
> source code, early/late fires are not supported yet in Windowing TVFs.
> Actually 1) contradicts with 2). Without early/late fires, we had to 
> compromise, either live with fresh incorrect data or tolerate excess latency 
> for correctness.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs

2023-04-20 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17714815#comment-17714815
 ] 

Aitozi commented on FLINK-29692:


hi, sorry for jumping into this discussion. I want to share two thoughts about 
this feature.
 * As [~jark] mentioned, in the group window aggregation the early/late fire 
has been supported. Window TVF as a feature-rich version of group window 
aggregation, so I think this ability should be aligned.
 * There is difference between the early/late fire and cumulate window tvf. 
early/late fire is something relates to the window trigger. But cumulative 
window relates to window assigner. We could use cumulative window to simulate 
the same functionality, but it may bring overhead as [~charles-tan] said.

So, +1 from my side to support emit strategy in window TVF.

> Support early/late fires for Windowing TVFs
> ---
>
> Key: FLINK-29692
> URL: https://issues.apache.org/jira/browse/FLINK-29692
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Affects Versions: 1.15.3
>Reporter: Canope Nerda
>Priority: Major
>
> I have cases where I need to 1) output data as soon as possible and 2) handle 
> late arriving data to achieve eventual correctness. In the logic, I need to 
> do window deduplication which is based on Windowing TVFs and according to 
> source code, early/late fires are not supported yet in Windowing TVFs.
> Actually 1) contradicts with 2). Without early/late fires, we had to 
> compromise, either live with fresh incorrect data or tolerate excess latency 
> for correctness.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs

2023-04-18 Thread Chalres Tan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17713726#comment-17713726
 ] 

Chalres Tan commented on FLINK-29692:
-

hi [~martijnvisser], there are many use cases that I believe can benefit from a 
feature like this. One use case example would be fraud detection. Let's say you 
want to be notified if a withdrawal from a bank account happens 3 times in an 
hour. With a TUMBLING window of 1 hour, it isn't acceptable that you should 
need to wait an hour for the window to end before realizing that a potential 
fraud has happened. Of course CUMULATE window can reduce this delay, but like I 
mentioned above we would be opening many more windows depending on how often we 
want to emit an intermediate result which can be resource intensive. I think 
supporting an early fire configuration, as was supported with older windowing 
functions, for the newer TVFs would be a more streamlined way of supporting use 
cases like these.

A secondary point is that windows in ksqldb have early fire by design (they 
struggle in the opposite way here in that they don't easily support emitting 
one correct result per window). Supporting an early fire configuration makes it 
much easier for users who are trying to migrate their use cases from ksqldb to 
Flink.

> Support early/late fires for Windowing TVFs
> ---
>
> Key: FLINK-29692
> URL: https://issues.apache.org/jira/browse/FLINK-29692
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Affects Versions: 1.15.3
>Reporter: Canope Nerda
>Priority: Major
>
> I have cases where I need to 1) output data as soon as possible and 2) handle 
> late arriving data to achieve eventual correctness. In the logic, I need to 
> do window deduplication which is based on Windowing TVFs and according to 
> source code, early/late fires are not supported yet in Windowing TVFs.
> Actually 1) contradicts with 2). Without early/late fires, we had to 
> compromise, either live with fresh incorrect data or tolerate excess latency 
> for correctness.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs

2023-04-17 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17712941#comment-17712941
 ] 

Martijn Visser commented on FLINK-29692:


[~charles-tan] Could you also elaborate on your use case for this?

> Support early/late fires for Windowing TVFs
> ---
>
> Key: FLINK-29692
> URL: https://issues.apache.org/jira/browse/FLINK-29692
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Affects Versions: 1.15.3
>Reporter: Canope Nerda
>Priority: Major
>
> I have cases where I need to 1) output data as soon as possible and 2) handle 
> late arriving data to achieve eventual correctness. In the logic, I need to 
> do window deduplication which is based on Windowing TVFs and according to 
> source code, early/late fires are not supported yet in Windowing TVFs.
> Actually 1) contradicts with 2). Without early/late fires, we had to 
> compromise, either live with fresh incorrect data or tolerate excess latency 
> for correctness.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs

2023-04-13 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711990#comment-17711990
 ] 

Martijn Visser commented on FLINK-29692:


[~charles-tan] I don’t think there’s consensus yet how this would/should work 
properly

> Support early/late fires for Windowing TVFs
> ---
>
> Key: FLINK-29692
> URL: https://issues.apache.org/jira/browse/FLINK-29692
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Affects Versions: 1.15.3
>Reporter: Canope Nerda
>Priority: Major
>
> I have cases where I need to 1) output data as soon as possible and 2) handle 
> late arriving data to achieve eventual correctness. In the logic, I need to 
> do window deduplication which is based on Windowing TVFs and according to 
> source code, early/late fires are not supported yet in Windowing TVFs.
> Actually 1) contradicts with 2). Without early/late fires, we had to 
> compromise, either live with fresh incorrect data or tolerate excess latency 
> for correctness.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs

2023-04-13 Thread Chalres Tan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711958#comment-17711958
 ] 

Chalres Tan commented on FLINK-29692:
-

Hi all, I notice this thread has gotten a little bit stale and was hoping to 
continue this conversation. I also have a use case that would benefit from the 
`emit.early-fire.enabled` configuration working with window TVF. Using cumulate 
window helps our use case, but cumulate window can be quite expensive if you 
have a small step size. Are there any plans for this issue?

> Support early/late fires for Windowing TVFs
> ---
>
> Key: FLINK-29692
> URL: https://issues.apache.org/jira/browse/FLINK-29692
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Affects Versions: 1.15.3
>Reporter: Canope Nerda
>Priority: Major
>
> I have cases where I need to 1) output data as soon as possible and 2) handle 
> late arriving data to achieve eventual correctness. In the logic, I need to 
> do window deduplication which is based on Windowing TVFs and according to 
> source code, early/late fires are not supported yet in Windowing TVFs.
> Actually 1) contradicts with 2). Without early/late fires, we had to 
> compromise, either live with fresh incorrect data or tolerate excess latency 
> for correctness.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs

2022-10-25 Thread Jakub Partyka (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17623595#comment-17623595
 ] 

Jakub Partyka commented on FLINK-29692:
---

{quote}Could you describe the demand in detail [~canopenerda] [~jjpar] , thanks 
a lot. 

Let's see if there are other better plans which could satisfy your demands, 
such as [cumulate window 
tvf?|https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/sql/queries/window-tvf/#cumulate]
{quote}
In my case I have a source, that has event time and watermark time in different 
columns. Watermark column guarantees, that all events with event time <=  
watermark time had arrived. But this watermark has greater delay, and we are 
working with minute-sized windows. Therefore we use early fire to emit 
intermediate results, without waiting for watermark.

> Support early/late fires for Windowing TVFs
> ---
>
> Key: FLINK-29692
> URL: https://issues.apache.org/jira/browse/FLINK-29692
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Affects Versions: 1.15.2
>Reporter: Canope Nerda
>Priority: Major
>
> I have cases where I need to 1) output data as soon as possible and 2) handle 
> late arriving data to achieve eventual correctness. In the logic, I need to 
> do window deduplication which is based on Windowing TVFs and according to 
> source code, early/late fires are not supported yet in Windowing TVFs.
> Actually 1) contradicts with 2). Without early/late fires, we had to 
> compromise, either live with fresh incorrect data or tolerate excess latency 
> for correctness.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs

2022-10-24 Thread Benchao Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17623481#comment-17623481
 ] 

Benchao Li commented on FLINK-29692:


{quote}'Early fire' would periodically sent intermediate result to downstream, 
the frequency to send intermediate result is based on processing time, even 
thought the window itself based on event time. It mixed up processing time 
semantics with the time attribute of Window.
{quote}
I agree, it's not perfect.
{quote}If you need to retire window state and send intermediate result, how 
about trying "[cumulate window 
tvf|https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/sql/queries/window-tvf/#cumulate];
 instead of 'early fire'?
{quote}
Cumulate window could cover most cases, except that it's semantic is different 
from others (unbounded aggregate and window with early-fire's output semantic 
is changelog for the same grouping key).

> Support early/late fires for Windowing TVFs
> ---
>
> Key: FLINK-29692
> URL: https://issues.apache.org/jira/browse/FLINK-29692
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Affects Versions: 1.15.2
>Reporter: Canope Nerda
>Priority: Major
>
> I have cases where I need to 1) output data as soon as possible and 2) handle 
> late arriving data to achieve eventual correctness. In the logic, I need to 
> do window deduplication which is based on Windowing TVFs and according to 
> source code, early/late fires are not supported yet in Windowing TVFs.
> Actually 1) contradicts with 2). Without early/late fires, we had to 
> compromise, either live with fresh incorrect data or tolerate excess latency 
> for correctness.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs

2022-10-24 Thread Jing Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17623154#comment-17623154
 ] 

Jing Zhang commented on FLINK-29692:


[~canopenerda] Thanks for sharing. 

The demand is a 
[deduplication|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/deduplication]
 on event time, get first row. But now time attribute is on millisecond unit 
instead of microsecond. I guess it's also the reason why you choose early-fire 
window.

How about use [topN|https://issues.apache.org/jira/browse/FLINK-23107]? top1 
almost has well performance as good as 
[deduplication|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/deduplication]
 because of [FLINK-23107|https://issues.apache.org/jira/browse/FLINK-23107].

> Support early/late fires for Windowing TVFs
> ---
>
> Key: FLINK-29692
> URL: https://issues.apache.org/jira/browse/FLINK-29692
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Affects Versions: 1.15.2
>Reporter: Canope Nerda
>Priority: Major
>
> I have cases where I need to 1) output data as soon as possible and 2) handle 
> late arriving data to achieve eventual correctness. In the logic, I need to 
> do window deduplication which is based on Windowing TVFs and according to 
> source code, early/late fires are not supported yet in Windowing TVFs.
> Actually 1) contradicts with 2). Without early/late fires, we had to 
> compromise, either live with fresh incorrect data or tolerate excess latency 
> for correctness.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs

2022-10-24 Thread Jing Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17623143#comment-17623143
 ] 

Jing Zhang commented on FLINK-29692:


[~libenchao] Thanks for sharing.

'Early fire' would periodically sent intermediate result to downstream, the 
frequency to send intermediate result is based on processing time, even thought 
the window itself based on event time. It mixed up processing time semantics 
with the time attribute of Window.

If you need to retire window state and send intermediate result, how about 
trying "[cumulate window 
tvf|https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/sql/queries/window-tvf/#cumulate];
 instead of 'early fire'?

> Support early/late fires for Windowing TVFs
> ---
>
> Key: FLINK-29692
> URL: https://issues.apache.org/jira/browse/FLINK-29692
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Affects Versions: 1.15.2
>Reporter: Canope Nerda
>Priority: Major
>
> I have cases where I need to 1) output data as soon as possible and 2) handle 
> late arriving data to achieve eventual correctness. In the logic, I need to 
> do window deduplication which is based on Windowing TVFs and according to 
> source code, early/late fires are not supported yet in Windowing TVFs.
> Actually 1) contradicts with 2). Without early/late fires, we had to 
> compromise, either live with fresh incorrect data or tolerate excess latency 
> for correctness.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs

2022-10-24 Thread Canope Nerda (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17623052#comment-17623052
 ] 

Canope Nerda commented on FLINK-29692:
--

[~jingzhang] Let me elaborate my case a little bit.

First of all, it's a case in high frequency trading. We deployed multiple 
agents globally to individually collect data  from exchanges, so inherently 
there are duplicates and each data entry has a field that uniquely identifies 
the data from biz perspective, and an event time column in microsecond. The 
event time column may vary for entries with the same unique column value. So we 
would like to first the entry with the earliest event time for each unique 
column value.

> Support early/late fires for Windowing TVFs
> ---
>
> Key: FLINK-29692
> URL: https://issues.apache.org/jira/browse/FLINK-29692
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Affects Versions: 1.15.2
>Reporter: Canope Nerda
>Priority: Major
>
> I have cases where I need to 1) output data as soon as possible and 2) handle 
> late arriving data to achieve eventual correctness. In the logic, I need to 
> do window deduplication which is based on Windowing TVFs and according to 
> source code, early/late fires are not supported yet in Windowing TVFs.
> Actually 1) contradicts with 2). Without early/late fires, we had to 
> compromise, either live with fresh incorrect data or tolerate excess latency 
> for correctness.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs

2022-10-23 Thread Benchao Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17622950#comment-17622950
 ] 

Benchao Li commented on FLINK-29692:


Regarding "early-fire with window" versus "unbounded aggregate", I could share 
a point. Usually they could do the same thing, but when come to the state 
retiring, they will be a little different. 
 # window could retire state using watermark, and drop late event
 # for unbounded aggregate, we could only set a state ttl, and there is no way 
to drop late event

This is the most important reason why some use cases still prefer to use 
"early-fire with window".

> Support early/late fires for Windowing TVFs
> ---
>
> Key: FLINK-29692
> URL: https://issues.apache.org/jira/browse/FLINK-29692
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Affects Versions: 1.15.2
>Reporter: Canope Nerda
>Priority: Major
>
> I have cases where I need to 1) output data as soon as possible and 2) handle 
> late arriving data to achieve eventual correctness. In the logic, I need to 
> do window deduplication which is based on Windowing TVFs and according to 
> source code, early/late fires are not supported yet in Windowing TVFs.
> Actually 1) contradicts with 2). Without early/late fires, we had to 
> compromise, either live with fresh incorrect data or tolerate excess latency 
> for correctness.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs

2022-10-23 Thread Jing Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17622940#comment-17622940
 ] 

Jing Zhang commented on FLINK-29692:


Sorry for late response.

I'm still wondering whether we really need early fire/late fire.

As [~jark] said, eary fire/late fires is supported in group window aggregation. 
when users enable early fire/ late fires of window aggregate, he/she would 
allow:
 # the emitted result would has no restrict mapping with time
 # the intermediate result could be allowed to emitted
 # the window result might be retracted later

If a user could allows the above conditions, why not use unbounded aggregate 
directly? Otherwise, window aggregate would be the best options. 

Could you describe the demand in detail [~canopenerda] [~jjpar] , thanks a lot. 

Maybe there are others plans which could satisfy your demands. such as 
[cumulate window 
tvf|https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/sql/queries/window-tvf/#cumulate]

> Support early/late fires for Windowing TVFs
> ---
>
> Key: FLINK-29692
> URL: https://issues.apache.org/jira/browse/FLINK-29692
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Affects Versions: 1.15.2
>Reporter: Canope Nerda
>Priority: Major
>
> I have cases where I need to 1) output data as soon as possible and 2) handle 
> late arriving data to achieve eventual correctness. In the logic, I need to 
> do window deduplication which is based on Windowing TVFs and according to 
> source code, early/late fires are not supported yet in Windowing TVFs.
> Actually 1) contradicts with 2). Without early/late fires, we had to 
> compromise, either live with fresh incorrect data or tolerate excess latency 
> for correctness.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs

2022-10-21 Thread Jakub Partyka (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1764#comment-1764
 ] 

Jakub Partyka commented on FLINK-29692:
---

It would be really nice if Windowing TVF's supported early/late fires.

For us early fire is a must, so we are forced to use legacy group window agg, 
which lack support for several optimizations: Split Distinct Aggregation and 
Local-Global Aggregation.

> Support early/late fires for Windowing TVFs
> ---
>
> Key: FLINK-29692
> URL: https://issues.apache.org/jira/browse/FLINK-29692
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Affects Versions: 1.15.2
>Reporter: Canope Nerda
>Priority: Major
>
> I have cases where I need to 1) output data as soon as possible and 2) handle 
> late arriving data to achieve eventual correctness. In the logic, I need to 
> do window deduplication which is based on Windowing TVFs and according to 
> source code, early/late fires are not supported yet in Windowing TVFs.
> Actually 1) contradicts with 2). Without early/late fires, we had to 
> compromise, either live with fresh incorrect data or tolerate excess latency 
> for correctness.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs

2022-10-20 Thread Canope Nerda (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17621435#comment-17621435
 ] 

Canope Nerda commented on FLINK-29692:
--

Thanks [~jark] for suggesting the legacy group window agg. So last value in 
your example is based on rowtime or proctime? Another problem in my case is 
that we require time precision to be microsecond, but time attribute only 
supports millisecond so far.

> Support early/late fires for Windowing TVFs
> ---
>
> Key: FLINK-29692
> URL: https://issues.apache.org/jira/browse/FLINK-29692
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Affects Versions: 1.15.2
>Reporter: Canope Nerda
>Priority: Major
>
> I have cases where I need to 1) output data as soon as possible and 2) handle 
> late arriving data to achieve eventual correctness. In the logic, I need to 
> do window deduplication which is based on Windowing TVFs and according to 
> source code, early/late fires are not supported yet in Windowing TVFs.
> Actually 1) contradicts with 2). Without early/late fires, we had to 
> compromise, either live with fresh incorrect data or tolerate excess latency 
> for correctness.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs

2022-10-20 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17621167#comment-17621167
 ] 

Jark Wu commented on FLINK-29692:
-

I'm fine with supporting this feature. But we have supported early/late fires 
for the legacy group window aggregation. You can use {{last_value(col)}} agg 
function to get window deduplication result. For example:

{code:sql}
-- output window result for every 5 seconds before window fire
SET table.exec.emit.early-fire.enabled = true;
SET table.exec.emit.early-fire.delay = 5s;
-- emit window result for every late arriving record
SET table.exec.emit.late-fire.enabled = true;
SET table.exec.emit.late-fire.delay = 0; 

SELECT TUMBLE_START(rowtime, INTERVAL '5' MINUTES), last_value(col_a), 
last_value(col_b)
FROM my_source_table
GROUP BY mykey, TUMBLE(rowtime, INTERVAL '5' MINUTES);
{code}

> Support early/late fires for Windowing TVFs
> ---
>
> Key: FLINK-29692
> URL: https://issues.apache.org/jira/browse/FLINK-29692
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Affects Versions: 1.15.2
>Reporter: Canope Nerda
>Priority: Major
>
> I have cases where I need to 1) output data as soon as possible and 2) handle 
> late arriving data to achieve eventual correctness. In the logic, I need to 
> do window deduplication which is based on Windowing TVFs and according to 
> source code, early/late fires are not supported yet in Windowing TVFs.
> Actually 1) contradicts with 2). Without early/late fires, we had to 
> compromise, either live with fresh incorrect data or tolerate excess latency 
> for correctness.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-29692) Support early/late fires for Windowing TVFs

2022-10-20 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-29692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17620957#comment-17620957
 ] 

Martijn Visser commented on FLINK-29692:


[~godfrey] [~jark] [~jingzhang] WDYT?

> Support early/late fires for Windowing TVFs
> ---
>
> Key: FLINK-29692
> URL: https://issues.apache.org/jira/browse/FLINK-29692
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / Planner
>Affects Versions: 1.15.2
>Reporter: Canope Nerda
>Priority: Major
>
> I have cases where I need to 1) output data as soon as possible and 2) handle 
> late arriving data to achieve eventual correctness. In the logic, I need to 
> do window deduplication which is based on Windowing TVFs and according to 
> source code, early/late fires are not supported yet in Windowing TVFs.
> Actually 1) contradicts with 2). Without early/late fires, we had to 
> compromise, either live with fresh incorrect data or tolerate excess latency 
> for correctness.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)