Hi Max,

Thank you for your reply. Our use case is to run a batch job against a Kinesis 
source. Our downstream systems are still on batch mode. So this application 
will read from the Kinesis source periodically and generate batched outputs. 
Without ability to resume from a checkpoint, it will be reading entire stream 
every time. 

Regards,
Mani

-----Original Message-----
From: Maximilian Michels <[email protected]> 
Sent: Tuesday, July 21, 2020 11:38 AM
To: [email protected]
Cc: Sunny, Mani Kolbe <[email protected]>
Subject: Re: Unbounded sources unable to recover from checkpointMark when 
withMaxReadTime() is used

CAUTION: This email originated from outside of D&B. Please do not click links 
or open attachments unless you recognize the sender and know the content is 
safe.


Hi Mani,

BoundedReadFromUnboundedSource was originally intended to be used in batch 
pipelines. In batch, runners typically do not perform checkpointing. In case of 
failures, they re-run the entire pipeline.

Keep in mind that, even with checkpointing, reading for a finite time in the 
processing time domain from an unbounded source rarely gives consistent results 
across runs.

However, ignoring the checkpoint looks problematic. We may want to fail during 
checkpointing to prevent violating correctness (e.g. exactly-once semantics).

-Max

On 21.07.20 11:36, Sunny, Mani Kolbe wrote:
> Observed on v2.22.0
>
> When withMaxReadTime() is used, Beam creates a 
> BoundedReadFromUnboundedSource [1].  The ReadFn<T> class in 
> BoundedReadFromUnboundedSource which is responsible for reading 
> records from source. You can see this class doesnt verify if there is 
> a recoverable checkpoint exist. Instead it always creates Reader with 
> checkpointMark set as null [2].
>
> Is this desired behavior? More importantly, do you guys recommend 
> using
> withMaxReadTime() in production setup? Or is this more for demo usecases?
>
> I have created a jira for the same (BEAM-104934
> <https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fiss
> ues.apache.org%2Fjira%2Fbrowse%2FBEAM-104934&amp;data=02%7C01%7CSunnyM%40dnb.com%7Cc9e9f11464ac4242099508d82d6247e4%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637309247422805087&amp;sdata=lrHfvqU6YzbZsE6Ks0ZRSgsOmbYFQim3sbgCJrpAQPA%3D&amp;reserved=0>)
>  and looking to work on a patch for the same. But would like a confirmation 
> on the above first.
>
> Regards,
>
> Mani
>
> Reference:
>
> [1]https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fg
> ithub.com%2Fapache%2Fbeam%2Fblob%2Fv2.22.0%2Fsdks%2Fjava%2Fcore%2Fsrc%
> 2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FRead.java%23L205&amp;
> data=02%7C01%7CSunnyM%40dnb.com%7Cc9e9f11464ac4242099508d82d6247e4%7C1
> 9e2b708bf12437597198dec42771b3e%7C0%7C0%7C637309247422805087&amp;sdata
> =dP3jW44zRDjNZSSEXM401CVF6926D9nt6wOCADbOW%2Bw%3D&amp;reserved=0
>
> [2]https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fg
> ithub.com%2Fapache%2Fbeam%2Fblob%2Fv2.22.0%2Fsdks%2Fjava%2Fcore%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FBoundedReadFromUnboundedSource.java%23L193&amp;data=02%7C01%7CSunnyM%40dnb.com%7Cc9e9f11464ac4242099508d82d6247e4%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637309247422805087&amp;sdata=YkyJLyoppmYkSr7%2BhYuW%2B5%2FL2XSSlNYm2CC1cKWlH64%3D&amp;reserved=0
>

Reply via email to