RE: Notify on 0 events in a Tumbling Event Time Window

2022-05-10 Thread Schwalbe Matthias
Hi Shilpa,

There is no need to have artificial messages in the input kafka topic (and I 
don’t see where Andrew suggests this  )

However your use case is not 100% clear as to for which keys you want to emit 
0-count window results , either:

  *   A) For all keys your job has ever seen (that’s easy), or
  *   B) For all keys you job has seen, but you stop sending 0-count windows 
after the first one is emitted, and only start with the key when there is a new 
input event on the key, or
  *   C) For all keys from a pre-selection of keys

A KeyedProcessFunction is the way to go
I’ll sketch a solution for scenario A) the others are similar (scala-ish):

class Manual0Windowing extends KeyedProcessFunction[…] {

def open(…) = {
//register state primitive for aggregated window state with default 
0-window-state
val state = …
}

def processEvent(event, …) = {
val windowEnd = getWindowEndTime(event)
ctx.registerEventTimeTimer(windowEnd)
var currentState = state.get //or default)
currentState = aggregate(currentState, event)
state.put(currentState)
}

def onTimer(timestamp, ctx, out) = {
val currentState = state.get

if(is0Window(currentState)) {
//for scenario B) drop next line
ctx….registerEventTimer(timestamp + tumblingWIndowTime)

} else {
ctx….registerEventTimer(timestamp + tumblingWIndowTime)
}
out.collect(currentState)
state.clear
}

}

… Just to give an idea
… this code does not take care of late events (need too use a MapState instead 
keyed by windowEndTime)

What do you think…?

Thias



From: Shilpa Shankar 
Sent: Monday, May 9, 2022 4:00 PM
To: Dario Heinisch ; Andrew Otto 
Cc: user@flink.apache.org
Subject: Re: Notify on 0 events in a Tumbling Event Time Window

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Thanks Andrew.
We did consider this solution too. Unfortunately we do not have permissions to 
generate artificial kafka events in our ecosystem.

Dario,
Thanks for your inputs. We will give your design a try. Due the number of 
events being processed per window, we are using incremental aggregate function 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/#processwindowfunction-with-incremental-aggregation.
 Do you think we can use KeyedCoProcessFunction in this design?

Thanks,
Shilpa







On Mon, May 9, 2022 at 9:31 AM Dario Heinisch 
mailto:dario.heini...@gmail.com>> wrote:

It depends on the user case,  in Shilpa's use case it is about users so the 
user ids are probably know beforehand.

https://dpaste.org/cRe3G <= This is an example with out an window but 
essentially Shilpa you would be reregistering the timers every time they fire.
You would also have to ingest the user ids before hand into your pipeline, so 
that if a user never has any event he still gets a notification. So probably on 
startup ingest the user ids with a single source
from the DB.

My example is pretty minimal but the idea in your case stays the same:

- key by user
- have a co-process function to init the state with the user ids
- reregister the timers every time they fire
- use `env.getConfig().setAutoWatermarkInterval(1000)` to move the event time 
forward even if there is no data coming in (this is what you are probably 
looking for!!)
- then collect an Optionable/CustomStruct/Null or so depending on if data is 
present or not
- and then u can check whether the event was triggered because there was data 
or because there wasn't data

Best regards,

Dario
On 09.05.22 15:19, Andrew Otto wrote:
This sounds similar to a non streaming problem we had at WMF.  We ingest all 
event data from Kafka into HDFS/Hive and partition the Hive tables in hourly 
directories.  If there are no events in a Kafka topic for a given hour, we have 
no way of knowing if the hour has been ingested successfully.  For all we know, 
the upstream producer pipeline might be broken.

We solved this by emitting artificial 'canary' events into each topic multiple 
times an hour.  The canary events producer uses the same code pathways and 
services that (most) of our normal event producers do.  Then, when ingesting 
into Hive, we filter out the canary events.  The ingestion code has work to do 
and can mark an hour as complete, but still end up writing no events to it.

Perhaps you could do the same?  Always emit artificial events, and filter them 
out in your windowing code? The window should still fire since it will always 
have events, even if you don't use them?




On Mon, May 9, 2022 at 8:55 AM Shilpa Shankar 
mailto:sshan...@bandwidth.com>> wrote:
Hello,
We are building a flink use case where we are consuming from a kafka topic and 
performing aggregations and generating alerts based on average, max, min 
thresholds. We also need to notify the users when there are 0 events in a 
Tumb

Re: Notify on 0 events in a Tumbling Event Time Window

2022-05-09 Thread Shilpa Shankar
Thanks Andrew.
We did consider this solution too. Unfortunately we do not have permissions
to generate artificial kafka events in our ecosystem.

Dario,
Thanks for your inputs. We will give your design a try. Due the number of
events being processed per window, we are using incremental aggregate
function
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/#processwindowfunction-with-incremental-aggregation.
Do you think we can use KeyedCoProcessFunction in this design?

Thanks,
Shilpa







On Mon, May 9, 2022 at 9:31 AM Dario Heinisch 
wrote:

> It depends on the user case,  in Shilpa's use case it is about users so
> the user ids are probably know beforehand.
>
> https://dpaste.org/cRe3G <= This is an example with out an window but
> essentially Shilpa you would be reregistering the timers every time they
> fire.
> You would also have to ingest the user ids before hand into your pipeline,
> so that if a user never has any event he still gets a notification. So
> probably on startup ingest the user ids with a single source
> from the DB.
>
> My example is pretty minimal but the idea in your case stays the same:
>
> - key by user
> - have a co-process function to init the state with the user ids
> - reregister the timers every time they fire
> - use `env.getConfig().setAutoWatermarkInterval(1000)` to move the event
> time forward even if there is no data coming in (this is what you are
> probably looking for!!)
> - then collect an Optionable/CustomStruct/Null or so depending on if data
> is present or not
> - and then u can check whether the event was triggered because there was
> data or because there wasn't data
>
> Best regards,
>
> Dario
> On 09.05.22 15:19, Andrew Otto wrote:
>
> This sounds similar to a non streaming problem we had at WMF.  We ingest
> all event data from Kafka into HDFS/Hive and partition the Hive tables in
> hourly directories.  If there are no events in a Kafka topic for a given
> hour, we have no way of knowing if the hour has been ingested
> successfully.  For all we know, the upstream producer pipeline might be
> broken.
>
> We solved this by emitting artificial 'canary' events into each topic
> multiple times an hour.  The canary events producer uses the same code
> pathways and services that (most) of our normal event producers do.  Then,
> when ingesting into Hive, we filter out the canary events.  The ingestion
> code has work to do and can mark an hour as complete, but still end up
> writing no events to it.
>
> Perhaps you could do the same?  Always emit artificial events, and filter
> them out in your windowing code? The window should still fire since it will
> always have events, even if you don't use them?
>
>
>
>
> On Mon, May 9, 2022 at 8:55 AM Shilpa Shankar 
> wrote:
>
>> Hello,
>> We are building a flink use case where we are consuming from a kafka
>> topic and performing aggregations and generating alerts based on average,
>> max, min thresholds. We also need to notify the users when there are 0
>> events in a Tumbling Event Time Windows. We are having trouble coming up
>> with a solution to do the same. The options we considered are below, please
>> let us know if there are other ideas we haven't looked into.
>>
>> [1] Querable State : Save the keys in each of the Process Window
>> Functions. Query the state from an external application and alert when a
>> key is missing after the 20min time interval has expired. We see Queryable
>> state feature is being deprecated in the future. We do not want to go down
>> this path when we already know there is an EOL for it.
>>
>> [2] Use Processing Time Windows :  Using Processing time instead of Event
>> time would have been an option if our downstream applications would send
>> out events in real time. Maintenances of the downstream applications,
>> delays etc would result in a lot of data loss which is undesirable.
>>
>> Flink version : 1.14.3
>>
>> Thanks,
>> Shilpa
>>
>


Re: Notify on 0 events in a Tumbling Event Time Window

2022-05-09 Thread Dario Heinisch
It depends on the user case,  in Shilpa's use case it is about users so 
the user ids are probably know beforehand.


https://dpaste.org/cRe3G <= This is an example with out an window but 
essentially Shilpa you would be reregistering the timers every time they 
fire.
You would also have to ingest the user ids before hand into your 
pipeline, so that if a user never has any event he still gets a 
notification. So probably on startup ingest the user ids with a single 
source

from the DB.

My example is pretty minimal but the idea in your case stays the same:

- key by user
- have a co-process function to init the state with the user ids
- reregister the timers every time they fire
- use `env.getConfig().setAutoWatermarkInterval(1000)` to move the event 
time forward even if there is no data coming in (this is what you are 
probably looking for!!)
- then collect an Optionable/CustomStruct/Null or so depending on if 
data is present or not
- and then u can check whether the event was triggered because there was 
data or because there wasn't data


Best regards,

Dario

On 09.05.22 15:19, Andrew Otto wrote:
This sounds similar to a non streaming problem we had at WMF.  We 
ingest all event data from Kafka into HDFS/Hive and partition the Hive 
tables in hourly directories.  If there are no events in a Kafka topic 
for a given hour, we have no way of knowing if the hour has been 
ingested successfully.  For all we know, the upstream producer 
pipeline might be broken.


We solved this by emitting artificial 'canary' events into each topic 
multiple times an hour.  The canary events producer uses the same code 
pathways and services that (most) of our normal event producers do.  
Then, when ingesting into Hive, we filter out the canary events.  The 
ingestion code has work to do and can mark an hour as complete, but 
still end up writing no events to it.


Perhaps you could do the same?  Always emit artificial events, and 
filter them out in your windowing code? The window should still fire 
since it will always have events, even if you don't use them?





On Mon, May 9, 2022 at 8:55 AM Shilpa Shankar  
wrote:


Hello,
We are building a flink use case where we are consuming from a
kafka topic and performing aggregations and generating alerts
based on average, max, min thresholds. We also need to notify the
users when there are 0 events in a Tumbling Event Time Windows. We
are having trouble coming up with a solution to do the same. The
options we considered are below, please let us know if there are
other ideas we haven't looked into.

[1] Querable State : Save the keys in each of the Process Window
Functions. Query the state from an external application and alert
when a key is missing after the 20min time interval has expired.
We see Queryable state feature is being deprecated in the future.
We do not want to go down this path when we already know there is
an EOL for it.

[2] Use Processing Time Windows :  Using Processing time instead
of Event time would have been an option if our downstream
applications would send out events in real time. Maintenances of
the downstream applications, delays etc would result in a lot of
data loss which is undesirable.

Flink version : 1.14.3

Thanks,
Shilpa


Re: Notify on 0 events in a Tumbling Event Time Window

2022-05-09 Thread Andrew Otto
This sounds similar to a non streaming problem we had at WMF.  We ingest
all event data from Kafka into HDFS/Hive and partition the Hive tables in
hourly directories.  If there are no events in a Kafka topic for a given
hour, we have no way of knowing if the hour has been ingested
successfully.  For all we know, the upstream producer pipeline might be
broken.

We solved this by emitting artificial 'canary' events into each topic
multiple times an hour.  The canary events producer uses the same code
pathways and services that (most) of our normal event producers do.  Then,
when ingesting into Hive, we filter out the canary events.  The ingestion
code has work to do and can mark an hour as complete, but still end up
writing no events to it.

Perhaps you could do the same?  Always emit artificial events, and filter
them out in your windowing code? The window should still fire since it will
always have events, even if you don't use them?




On Mon, May 9, 2022 at 8:55 AM Shilpa Shankar 
wrote:

> Hello,
> We are building a flink use case where we are consuming from a kafka topic
> and performing aggregations and generating alerts based on average, max,
> min thresholds. We also need to notify the users when there are 0 events in
> a Tumbling Event Time Windows. We are having trouble coming up with a
> solution to do the same. The options we considered are below, please let us
> know if there are other ideas we haven't looked into.
>
> [1] Querable State : Save the keys in each of the Process Window
> Functions. Query the state from an external application and alert when a
> key is missing after the 20min time interval has expired. We see Queryable
> state feature is being deprecated in the future. We do not want to go down
> this path when we already know there is an EOL for it.
>
> [2] Use Processing Time Windows :  Using Processing time instead of Event
> time would have been an option if our downstream applications would send
> out events in real time. Maintenances of the downstream applications,
> delays etc would result in a lot of data loss which is undesirable.
>
> Flink version : 1.14.3
>
> Thanks,
> Shilpa
>


Notify on 0 events in a Tumbling Event Time Window

2022-05-09 Thread Shilpa Shankar
Hello,
We are building a flink use case where we are consuming from a kafka topic
and performing aggregations and generating alerts based on average, max,
min thresholds. We also need to notify the users when there are 0 events in
a Tumbling Event Time Windows. We are having trouble coming up with a
solution to do the same. The options we considered are below, please let us
know if there are other ideas we haven't looked into.

[1] Querable State : Save the keys in each of the Process Window Functions.
Query the state from an external application and alert when a key is
missing after the 20min time interval has expired. We see Queryable state
feature is being deprecated in the future. We do not want to go down this
path when we already know there is an EOL for it.

[2] Use Processing Time Windows :  Using Processing time instead of Event
time would have been an option if our downstream applications would send
out events in real time. Maintenances of the downstream applications,
delays etc would result in a lot of data loss which is undesirable.

Flink version : 1.14.3

Thanks,
Shilpa