Re: tumbling event time window , parallel

2019-09-02 Thread Fabian Hueske
I meant to not use Flink's built-in windows at all but implement your logic
in a KeyedProcessFunction.

So basically:
myDataStream.keyBy(...).process(new MyKeyedProcessFunction)
instead of:
myDataStream.keyBy(...).window(...).process(new MyWindowProcessFunction)

Am Mo., 2. Sept. 2019 um 15:29 Uhr schrieb Hanan Yehudai <
hanan.yehu...@radcom.com>:

> Im not sure what you mean by use process function and not window process
> function ,  as the window operator takes in a windowprocess function..
>
>
>
> *From:* Fabian Hueske 
> *Sent:* Monday, August 26, 2019 1:33 PM
> *To:* Hanan Yehudai 
> *Cc:* user@flink.apache.org
> *Subject:* Re: tumbling event time window , parallel
>
>
>
> I would use a regular ProcessFunction, not a WindowProcessFunction.
>
>
>
> The final WM depends on how the records were partitioned at the watermark
> assigner (and the assigner itself).
>
> AFAIK, the distribution of files to source reader tasks is not
> deterministic. Hence, the final WM changes from run to run.
>
>
>
> Fabian
>
>
>
> Am Mo., 26. Aug. 2019 um 12:16 Uhr schrieb Hanan Yehudai <
> hanan.yehu...@radcom.com>:
>
> You said “ You can use a custom ProcessFunction and compare the timestamp
> of each record with the current watermark.”.
>
>
>
> Does the  window  process function has all the events – even the ones that
> are dropped due to lateness?
> from what I’m understand the “ iterable”  argument I contains the record
> that were inserted into the window  and NOT the ones dropped.   Isn’t that
> correct ?
>
>
>
>
>
> Also,
>
> when looking on Flink’s monitoring page  - for  the  watermarks  I see
> different vales  even after all my files were processed.  Which is
> something I would not expect
> I would expect that eventually   the WM will be the highest EVENT_TIME on
> my set of files..
>
>
>
>
>
> thanks
>
>
>
> *From:* Fabian Hueske 
> *Sent:* Monday, August 26, 2019 12:38 PM
> *To:* Hanan Yehudai 
> *Cc:* user@flink.apache.org
> *Subject:* Re: tumbling event time window , parallel
>
>
>
> Hi,
>
>
>
> The paths of the files to read are distributed across all reader / source
> tasks and each task reads the files in order of their modification
> timestamp.
>
> The watermark generator is not aware of any files and just looks at the
> stream of records produced by the source tasks.
>
> You need to chose the WM generator strategy such that you minimize the
> number of late records.
>
>
>
> I'd recommend to first investigate how many late records you are dealing
> with.
>
> You can use a custom ProcessFunction and compare the timestamp of each
> record with the current watermark.
>
>
>
> AllowedLateness is also not a magical cure. It will just emit updates
> downstream, i.e., you need to remove the results that were updated by a
> more complete result.
>
>
>
> Best, Fabian
>
>
>
> Am Mo., 26. Aug. 2019 um 10:21 Uhr schrieb Hanan Yehudai <
> hanan.yehu...@radcom.com>:
>
> The data  source is generated by an application that monitors some sort of
> sessions.
>
> With the EVENT_TIME column being the session end time .
>
>
>
> It is possible that the files will have out of order data , because of the
> async nature of the application writing  files.
>
>  While the EVENT_TIME is monotonically  increasing in general .  some
> lateness is possible. However ,I used *allowlateness*  on my stream
> and still got the inconsistencies
>
>
>
> Although the real life use case is generically reading files form a
> folder.  The testing  env has an already set of files in advanced -  these
>  should be read and produce the result.
>
>
>
> You mentioned the “right” order of the files.  Is it sorted by update time
> ?  when running in parallel, is it possible that 2 files will be read in
> parallel. And in case that the latter one is smaller.  The latest timestamp
> will  be handled first ?
>
>
>
>
>
> BTW I tried to use a ContinuousEventTimeTrigger  to make sure the window
> is calculated ?  and got the processing to trigger multiple times  so I’m
> not sure exactly how this type of trigger works..
>
>
>
> Thanks
>
>
>
>
>
>
>
>
>
> *From:* Fabian Hueske 
> *Sent:* Monday, August 26, 2019 11:06 AM
> *To:* Hanan Yehudai 
> *Cc:* user@flink.apache.org
> *Subject:* Re: tumbling event time window , parallel
>
>
>
> Hi,
>
>
>
> Can you share a few more details about the data source?
>
> Are you continuously ingesting files from a folder?
>
>
>
> You are correct, that the parallelism should not affect the re

RE: tumbling event time window , parallel

2019-09-02 Thread Hanan Yehudai
Im not sure what you mean by use process function and not window process 
function ,  as the window operator takes in a windowprocess function..

From: Fabian Hueske 
Sent: Monday, August 26, 2019 1:33 PM
To: Hanan Yehudai 
Cc: user@flink.apache.org
Subject: Re: tumbling event time window , parallel

I would use a regular ProcessFunction, not a WindowProcessFunction.

The final WM depends on how the records were partitioned at the watermark 
assigner (and the assigner itself).
AFAIK, the distribution of files to source reader tasks is not deterministic. 
Hence, the final WM changes from run to run.

Fabian

Am Mo., 26. Aug. 2019 um 12:16 Uhr schrieb Hanan Yehudai 
mailto:hanan.yehu...@radcom.com>>:
You said “ You can use a custom ProcessFunction and compare the timestamp of 
each record with the current watermark.”.

Does the  window  process function has all the events – even the ones that are 
dropped due to lateness?
from what I’m understand the “ iterable”  argument I contains the record that 
were inserted into the window  and NOT the ones dropped.   Isn’t that correct ?


Also,
when looking on Flink’s monitoring page  - for  the  watermarks  I see 
different vales  even after all my files were processed.  Which is something I 
would not expect
I would expect that eventually   the WM will be the highest EVENT_TIME on my 
set of files..


thanks

From: Fabian Hueske mailto:fhue...@gmail.com>>
Sent: Monday, August 26, 2019 12:38 PM
To: Hanan Yehudai mailto:hanan.yehu...@radcom.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: tumbling event time window , parallel

Hi,

The paths of the files to read are distributed across all reader / source tasks 
and each task reads the files in order of their modification timestamp.
The watermark generator is not aware of any files and just looks at the stream 
of records produced by the source tasks.
You need to chose the WM generator strategy such that you minimize the number 
of late records.

I'd recommend to first investigate how many late records you are dealing with.
You can use a custom ProcessFunction and compare the timestamp of each record 
with the current watermark.

AllowedLateness is also not a magical cure. It will just emit updates 
downstream, i.e., you need to remove the results that were updated by a more 
complete result.

Best, Fabian

Am Mo., 26. Aug. 2019 um 10:21 Uhr schrieb Hanan Yehudai 
mailto:hanan.yehu...@radcom.com>>:
The data  source is generated by an application that monitors some sort of 
sessions.
With the EVENT_TIME column being the session end time .

It is possible that the files will have out of order data , because of the 
async nature of the application writing  files.
 While the EVENT_TIME is monotonically  increasing in general .  some lateness 
is possible. However ,I used allowlateness  on my stream and still got the 
inconsistencies

Although the real life use case is generically reading files form a folder.  
The testing  env has an already set of files in advanced -  these  should be 
read and produce the result.

You mentioned the “right” order of the files.  Is it sorted by update time ?  
when running in parallel, is it possible that 2 files will be read in parallel. 
And in case that the latter one is smaller.  The latest timestamp will  be 
handled first ?


BTW I tried to use a ContinuousEventTimeTrigger  to make sure the window is 
calculated ?  and got the processing to trigger multiple times  so I’m not sure 
exactly how this type of trigger works..

Thanks




From: Fabian Hueske mailto:fhue...@gmail.com>>
Sent: Monday, August 26, 2019 11:06 AM
To: Hanan Yehudai mailto:hanan.yehu...@radcom.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: tumbling event time window , parallel

Hi,

Can you share a few more details about the data source?
Are you continuously ingesting files from a folder?

You are correct, that the parallelism should not affect the results, but there 
are a few things that can affect that:
1) non-determnistic keys
2) out-of-order data with inappropriate watermarks

Note that watermark configuration for file ingests can be difficult and that 
you need to ensure that files are read in the "right" order.
AFAIK, Flink's continuous file source uses the modification timestamp of files 
to determine the read order.

Best, Fabian

Am So., 25. Aug. 2019 um 19:32 Uhr schrieb Hanan Yehudai 
mailto:hanan.yehu...@radcom.com>>:
I have an issue with tumbling windows running  in parallel.

I run a Job on  a set of CSV files.

When the parallelism is set to 1.  I get the proper results.
While it runs in parallel.   I get no output.
Is it  due to the fact the parallel streams take the MAX(watermark) from all 
the parallel sources.
And only one of the streams advances the watermark ?

It seems wrong that the result is not deterministic  and depends on the 
parallel level.
What am I doing wrong ?



Re: tumbling event time window , parallel

2019-08-26 Thread Fabian Hueske
I would use a regular ProcessFunction, not a WindowProcessFunction.

The final WM depends on how the records were partitioned at the watermark
assigner (and the assigner itself).
AFAIK, the distribution of files to source reader tasks is not
deterministic. Hence, the final WM changes from run to run.

Fabian

Am Mo., 26. Aug. 2019 um 12:16 Uhr schrieb Hanan Yehudai <
hanan.yehu...@radcom.com>:

> You said “ You can use a custom ProcessFunction and compare the timestamp
> of each record with the current watermark.”.
>
>
>
> Does the  window  process function has all the events – even the ones that
> are dropped due to lateness?
> from what I’m understand the “ iterable”  argument I contains the record
> that were inserted into the window  and NOT the ones dropped.   Isn’t that
> correct ?
>
>
>
>
>
> Also,
>
> when looking on Flink’s monitoring page  - for  the  watermarks  I see
> different vales  even after all my files were processed.  Which is
> something I would not expect
> I would expect that eventually   the WM will be the highest EVENT_TIME on
> my set of files..
>
>
>
>
>
> thanks
>
>
>
> *From:* Fabian Hueske 
> *Sent:* Monday, August 26, 2019 12:38 PM
> *To:* Hanan Yehudai 
> *Cc:* user@flink.apache.org
> *Subject:* Re: tumbling event time window , parallel
>
>
>
> Hi,
>
>
>
> The paths of the files to read are distributed across all reader / source
> tasks and each task reads the files in order of their modification
> timestamp.
>
> The watermark generator is not aware of any files and just looks at the
> stream of records produced by the source tasks.
>
> You need to chose the WM generator strategy such that you minimize the
> number of late records.
>
>
>
> I'd recommend to first investigate how many late records you are dealing
> with.
>
> You can use a custom ProcessFunction and compare the timestamp of each
> record with the current watermark.
>
>
>
> AllowedLateness is also not a magical cure. It will just emit updates
> downstream, i.e., you need to remove the results that were updated by a
> more complete result.
>
>
>
> Best, Fabian
>
>
>
> Am Mo., 26. Aug. 2019 um 10:21 Uhr schrieb Hanan Yehudai <
> hanan.yehu...@radcom.com>:
>
> The data  source is generated by an application that monitors some sort of
> sessions.
>
> With the EVENT_TIME column being the session end time .
>
>
>
> It is possible that the files will have out of order data , because of the
> async nature of the application writing  files.
>
>  While the EVENT_TIME is monotonically  increasing in general .  some
> lateness is possible. However ,I used *allowlateness*  on my stream
> and still got the inconsistencies
>
>
>
> Although the real life use case is generically reading files form a
> folder.  The testing  env has an already set of files in advanced -  these
>  should be read and produce the result.
>
>
>
> You mentioned the “right” order of the files.  Is it sorted by update time
> ?  when running in parallel, is it possible that 2 files will be read in
> parallel. And in case that the latter one is smaller.  The latest timestamp
> will  be handled first ?
>
>
>
>
>
> BTW I tried to use a ContinuousEventTimeTrigger  to make sure the window
> is calculated ?  and got the processing to trigger multiple times  so I’m
> not sure exactly how this type of trigger works..
>
>
>
> Thanks
>
>
>
>
>
>
>
>
>
> *From:* Fabian Hueske 
> *Sent:* Monday, August 26, 2019 11:06 AM
> *To:* Hanan Yehudai 
> *Cc:* user@flink.apache.org
> *Subject:* Re: tumbling event time window , parallel
>
>
>
> Hi,
>
>
>
> Can you share a few more details about the data source?
>
> Are you continuously ingesting files from a folder?
>
>
>
> You are correct, that the parallelism should not affect the results, but
> there are a few things that can affect that:
>
> 1) non-determnistic keys
>
> 2) out-of-order data with inappropriate watermarks
>
>
>
> Note that watermark configuration for file ingests can be difficult and
> that you need to ensure that files are read in the "right" order.
>
> AFAIK, Flink's continuous file source uses the modification timestamp of
> files to determine the read order.
>
>
>
> Best, Fabian
>
>
>
> Am So., 25. Aug. 2019 um 19:32 Uhr schrieb Hanan Yehudai <
> hanan.yehu...@radcom.com>:
>
> I have an issue with tumbling windows running  in parallel.
>
>
>
> I run a Job on  a set of CSV files.
>
>
>
> When the parallelism is set to 1.  I get the proper results.
>
> While it runs in parallel.   I get no output.
>
> Is it  due to the fact the parallel streams take the MAX(watermark) from
> all the parallel sources.
>
> And only one of the streams advances the watermark ?
>
>
>
> It seems wrong that the result is not deterministic  and depends on the
> parallel level.
>
> What am I doing wrong ?
>
>
>
>


RE: tumbling event time window , parallel

2019-08-26 Thread Hanan Yehudai
You said “ You can use a custom ProcessFunction and compare the timestamp of 
each record with the current watermark.”.

Does the  window  process function has all the events – even the ones that are 
dropped due to lateness?
from what I’m understand the “ iterable”  argument I contains the record that 
were inserted into the window  and NOT the ones dropped.   Isn’t that correct ?


Also,
when looking on Flink’s monitoring page  - for  the  watermarks  I see 
different vales  even after all my files were processed.  Which is something I 
would not expect
I would expect that eventually   the WM will be the highest EVENT_TIME on my 
set of files..



thanks

From: Fabian Hueske 
Sent: Monday, August 26, 2019 12:38 PM
To: Hanan Yehudai 
Cc: user@flink.apache.org
Subject: Re: tumbling event time window , parallel

Hi,

The paths of the files to read are distributed across all reader / source tasks 
and each task reads the files in order of their modification timestamp.
The watermark generator is not aware of any files and just looks at the stream 
of records produced by the source tasks.
You need to chose the WM generator strategy such that you minimize the number 
of late records.

I'd recommend to first investigate how many late records you are dealing with.
You can use a custom ProcessFunction and compare the timestamp of each record 
with the current watermark.

AllowedLateness is also not a magical cure. It will just emit updates 
downstream, i.e., you need to remove the results that were updated by a more 
complete result.

Best, Fabian

Am Mo., 26. Aug. 2019 um 10:21 Uhr schrieb Hanan Yehudai 
mailto:hanan.yehu...@radcom.com>>:
The data  source is generated by an application that monitors some sort of 
sessions.
With the EVENT_TIME column being the session end time .

It is possible that the files will have out of order data , because of the 
async nature of the application writing  files.
 While the EVENT_TIME is monotonically  increasing in general .  some lateness 
is possible. However ,I used allowlateness  on my stream and still got the 
inconsistencies

Although the real life use case is generically reading files form a folder.  
The testing  env has an already set of files in advanced -  these  should be 
read and produce the result.

You mentioned the “right” order of the files.  Is it sorted by update time ?  
when running in parallel, is it possible that 2 files will be read in parallel. 
And in case that the latter one is smaller.  The latest timestamp will  be 
handled first ?


BTW I tried to use a ContinuousEventTimeTrigger  to make sure the window is 
calculated ?  and got the processing to trigger multiple times  so I’m not sure 
exactly how this type of trigger works..

Thanks




From: Fabian Hueske mailto:fhue...@gmail.com>>
Sent: Monday, August 26, 2019 11:06 AM
To: Hanan Yehudai mailto:hanan.yehu...@radcom.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: tumbling event time window , parallel

Hi,

Can you share a few more details about the data source?
Are you continuously ingesting files from a folder?

You are correct, that the parallelism should not affect the results, but there 
are a few things that can affect that:
1) non-determnistic keys
2) out-of-order data with inappropriate watermarks

Note that watermark configuration for file ingests can be difficult and that 
you need to ensure that files are read in the "right" order.
AFAIK, Flink's continuous file source uses the modification timestamp of files 
to determine the read order.

Best, Fabian

Am So., 25. Aug. 2019 um 19:32 Uhr schrieb Hanan Yehudai 
mailto:hanan.yehu...@radcom.com>>:
I have an issue with tumbling windows running  in parallel.

I run a Job on  a set of CSV files.

When the parallelism is set to 1.  I get the proper results.
While it runs in parallel.   I get no output.
Is it  due to the fact the parallel streams take the MAX(watermark) from all 
the parallel sources.
And only one of the streams advances the watermark ?

It seems wrong that the result is not deterministic  and depends on the 
parallel level.
What am I doing wrong ?



Re: tumbling event time window , parallel

2019-08-26 Thread Fabian Hueske
Hi,

The paths of the files to read are distributed across all reader / source
tasks and each task reads the files in order of their modification
timestamp.
The watermark generator is not aware of any files and just looks at the
stream of records produced by the source tasks.
You need to chose the WM generator strategy such that you minimize the
number of late records.

I'd recommend to first investigate how many late records you are dealing
with.
You can use a custom ProcessFunction and compare the timestamp of each
record with the current watermark.

AllowedLateness is also not a magical cure. It will just emit updates
downstream, i.e., you need to remove the results that were updated by a
more complete result.

Best, Fabian

Am Mo., 26. Aug. 2019 um 10:21 Uhr schrieb Hanan Yehudai <
hanan.yehu...@radcom.com>:

> The data  source is generated by an application that monitors some sort of
> sessions.
>
> With the EVENT_TIME column being the session end time .
>
>
>
> It is possible that the files will have out of order data , because of the
> async nature of the application writing  files.
>
>  While the EVENT_TIME is monotonically  increasing in general .  some
> lateness is possible. However ,I used *allowlateness*  on my stream
> and still got the inconsistencies
>
>
>
> Although the real life use case is generically reading files form a
> folder.  The testing  env has an already set of files in advanced -  these
>  should be read and produce the result.
>
>
>
> You mentioned the “right” order of the files.  Is it sorted by update time
> ?  when running in parallel, is it possible that 2 files will be read in
> parallel. And in case that the latter one is smaller.  The latest timestamp
> will  be handled first ?
>
>
>
>
>
> BTW I tried to use a ContinuousEventTimeTrigger  to make sure the window
> is calculated ?  and got the processing to trigger multiple times  so I’m
> not sure exactly how this type of trigger works..
>
>
>
> Thanks
>
>
>
>
>
>
>
>
>
> *From:* Fabian Hueske 
> *Sent:* Monday, August 26, 2019 11:06 AM
> *To:* Hanan Yehudai 
> *Cc:* user@flink.apache.org
> *Subject:* Re: tumbling event time window , parallel
>
>
>
> Hi,
>
>
>
> Can you share a few more details about the data source?
>
> Are you continuously ingesting files from a folder?
>
>
>
> You are correct, that the parallelism should not affect the results, but
> there are a few things that can affect that:
>
> 1) non-determnistic keys
>
> 2) out-of-order data with inappropriate watermarks
>
>
>
> Note that watermark configuration for file ingests can be difficult and
> that you need to ensure that files are read in the "right" order.
>
> AFAIK, Flink's continuous file source uses the modification timestamp of
> files to determine the read order.
>
>
>
> Best, Fabian
>
>
>
> Am So., 25. Aug. 2019 um 19:32 Uhr schrieb Hanan Yehudai <
> hanan.yehu...@radcom.com>:
>
> I have an issue with tumbling windows running  in parallel.
>
>
>
> I run a Job on  a set of CSV files.
>
>
>
> When the parallelism is set to 1.  I get the proper results.
>
> While it runs in parallel.   I get no output.
>
> Is it  due to the fact the parallel streams take the MAX(watermark) from
> all the parallel sources.
>
> And only one of the streams advances the watermark ?
>
>
>
> It seems wrong that the result is not deterministic  and depends on the
> parallel level.
>
> What am I doing wrong ?
>
>
>
>


RE: tumbling event time window , parallel

2019-08-26 Thread Hanan Yehudai
The data  source is generated by an application that monitors some sort of 
sessions.
With the EVENT_TIME column being the session end time .

It is possible that the files will have out of order data , because of the 
async nature of the application writing  files.
 While the EVENT_TIME is monotonically  increasing in general .  some lateness 
is possible. However ,I used allowlateness  on my stream and still got the 
inconsistencies

Although the real life use case is generically reading files form a folder.  
The testing  env has an already set of files in advanced -  these  should be 
read and produce the result.

You mentioned the “right” order of the files.  Is it sorted by update time ?  
when running in parallel, is it possible that 2 files will be read in parallel. 
And in case that the latter one is smaller.  The latest timestamp will  be 
handled first ?


BTW I tried to use a ContinuousEventTimeTrigger  to make sure the window is 
calculated ?  and got the processing to trigger multiple times  so I’m not sure 
exactly how this type of trigger works..

Thanks




From: Fabian Hueske 
Sent: Monday, August 26, 2019 11:06 AM
To: Hanan Yehudai 
Cc: user@flink.apache.org
Subject: Re: tumbling event time window , parallel

Hi,

Can you share a few more details about the data source?
Are you continuously ingesting files from a folder?

You are correct, that the parallelism should not affect the results, but there 
are a few things that can affect that:
1) non-determnistic keys
2) out-of-order data with inappropriate watermarks

Note that watermark configuration for file ingests can be difficult and that 
you need to ensure that files are read in the "right" order.
AFAIK, Flink's continuous file source uses the modification timestamp of files 
to determine the read order.

Best, Fabian

Am So., 25. Aug. 2019 um 19:32 Uhr schrieb Hanan Yehudai 
mailto:hanan.yehu...@radcom.com>>:
I have an issue with tumbling windows running  in parallel.

I run a Job on  a set of CSV files.

When the parallelism is set to 1.  I get the proper results.
While it runs in parallel.   I get no output.
Is it  due to the fact the parallel streams take the MAX(watermark) from all 
the parallel sources.
And only one of the streams advances the watermark ?

It seems wrong that the result is not deterministic  and depends on the 
parallel level.
What am I doing wrong ?



Re: tumbling event time window , parallel

2019-08-26 Thread Fabian Hueske
Hi,

Can you share a few more details about the data source?
Are you continuously ingesting files from a folder?

You are correct, that the parallelism should not affect the results, but
there are a few things that can affect that:
1) non-determnistic keys
2) out-of-order data with inappropriate watermarks

Note that watermark configuration for file ingests can be difficult and
that you need to ensure that files are read in the "right" order.
AFAIK, Flink's continuous file source uses the modification timestamp of
files to determine the read order.

Best, Fabian

Am So., 25. Aug. 2019 um 19:32 Uhr schrieb Hanan Yehudai <
hanan.yehu...@radcom.com>:

> I have an issue with tumbling windows running  in parallel.
>
>
>
> I run a Job on  a set of CSV files.
>
>
>
> When the parallelism is set to 1.  I get the proper results.
>
> While it runs in parallel.   I get no output.
>
> Is it  due to the fact the parallel streams take the MAX(watermark) from
> all the parallel sources.
>
> And only one of the streams advances the watermark ?
>
>
>
> It seems wrong that the result is not deterministic  and depends on the
> parallel level.
>
> What am I doing wrong ?
>
>
>


tumbling event time window , parallel

2019-08-25 Thread Hanan Yehudai
I have an issue with tumbling windows running  in parallel.

I run a Job on  a set of CSV files.

When the parallelism is set to 1.  I get the proper results.
While it runs in parallel.   I get no output.
Is it  due to the fact the parallel streams take the MAX(watermark) from all 
the parallel sources.
And only one of the streams advances the watermark ?

It seems wrong that the result is not deterministic  and depends on the 
parallel level.
What am I doing wrong ?