Hi Mani, Knowing when you last run was stopped (since you use batch mode), could you leverage “withInitialPositionInStream()” or “withInitialTimestampInStream()” for KinesisIO in this case?
Alexey > On 21 Jul 2020, at 13:40, Sunny, Mani Kolbe <[email protected]> wrote: > > Hi Max, > > Thank you for your reply. Our use case is to run a batch job against a > Kinesis source. Our downstream systems are still on batch mode. So this > application will read from the Kinesis source periodically and generate > batched outputs. Without ability to resume from a checkpoint, it will be > reading entire stream every time. > > Regards, > Mani > > -----Original Message----- > From: Maximilian Michels <[email protected] <mailto:[email protected]>> > Sent: Tuesday, July 21, 2020 11:38 AM > To: [email protected] <mailto:[email protected]> > Cc: Sunny, Mani Kolbe <[email protected] <mailto:[email protected]>> > Subject: Re: Unbounded sources unable to recover from checkpointMark when > withMaxReadTime() is used > > CAUTION: This email originated from outside of D&B. Please do not click links > or open attachments unless you recognize the sender and know the content is > safe. > > > Hi Mani, > > BoundedReadFromUnboundedSource was originally intended to be used in batch > pipelines. In batch, runners typically do not perform checkpointing. In case > of failures, they re-run the entire pipeline. > > Keep in mind that, even with checkpointing, reading for a finite time in the > processing time domain from an unbounded source rarely gives consistent > results across runs. > > However, ignoring the checkpoint looks problematic. We may want to fail > during checkpointing to prevent violating correctness (e.g. exactly-once > semantics). > > -Max > > On 21.07.20 11:36, Sunny, Mani Kolbe wrote: >> Observed on v2.22.0 >> >> When withMaxReadTime() is used, Beam creates a >> BoundedReadFromUnboundedSource [1]. The ReadFn<T> class in >> BoundedReadFromUnboundedSource which is responsible for reading >> records from source. You can see this class doesnt verify if there is >> a recoverable checkpoint exist. Instead it always creates Reader with >> checkpointMark set as null [2]. >> >> Is this desired behavior? More importantly, do you guys recommend >> using >> withMaxReadTime() in production setup? Or is this more for demo usecases? >> >> I have created a jira for the same (BEAM-104934 >> <https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fiss >> <https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fiss> >> ues.apache.org >> <http://ues.apache.org/>%2Fjira%2Fbrowse%2FBEAM-104934&data=02%7C01%7CSunnyM%40dnb.com >> >> <http://40dnb.com/>%7Cc9e9f11464ac4242099508d82d6247e4%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637309247422805087&sdata=lrHfvqU6YzbZsE6Ks0ZRSgsOmbYFQim3sbgCJrpAQPA%3D&reserved=0>) >> and looking to work on a patch for the same. But would like a confirmation >> on the above first. >> >> Regards, >> >> Mani >> >> Reference: >> >> [1]https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fg >> <https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fg> >> ithub.com >> <http://ithub.com/>%2Fapache%2Fbeam%2Fblob%2Fv2.22.0%2Fsdks%2Fjava%2Fcore%2Fsrc% >> 2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FRead.java%23L205& >> data=02%7C01%7CSunnyM%40dnb.com >> <http://40dnb.com/>%7Cc9e9f11464ac4242099508d82d6247e4%7C1 >> 9e2b708bf12437597198dec42771b3e%7C0%7C0%7C637309247422805087&sdata >> =dP3jW44zRDjNZSSEXM401CVF6926D9nt6wOCADbOW%2Bw%3D&reserved=0 >> >> [2]https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fg >> <https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fg> >> ithub.com >> <http://ithub.com/>%2Fapache%2Fbeam%2Fblob%2Fv2.22.0%2Fsdks%2Fjava%2Fcore%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FBoundedReadFromUnboundedSource.java%23L193&data=02%7C01%7CSunnyM%40dnb.com >> >> <http://40dnb.com/>%7Cc9e9f11464ac4242099508d82d6247e4%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637309247422805087&sdata=YkyJLyoppmYkSr7%2BhYuW%2B5%2FL2XSSlNYm2CC1cKWlH64%3D&reserved=0
