Thank you Max. Is there a reference code somewhere to implement checkpointing for BoundedReadFromUnboundedSource ? I couldn’t quiet figure out where to get restored checkpointMark. It appears to be specific to runner implementations.
-----Original Message----- From: Maximilian Michels <[email protected]> Sent: Monday, July 27, 2020 3:04 PM To: [email protected]; Sunny, Mani Kolbe <[email protected]> Subject: Re: Unbounded sources unable to recover from checkpointMark when withMaxReadTime() is used CAUTION: This email originated from outside of D&B. Please do not click links or open attachments unless you recognize the sender and know the content is safe. Hi Mani, Just to let you know I think it would make sense to either (1) implement checkpointing for BoundedReadFromUnboundedSource or (2) throw an error in case of a provided checkpoint mark Like you pointed out, ignoring it like we currently do, does not seem like a feasible solution. -Max On 21.07.20 16:16, Sunny, Mani Kolbe wrote: > Hi Alexey, > > I did explore that route. The problem there is to identify what > timestamp to use. While processing records, you can capture the > timestamp. This cannot be processing time, but event time on the > Kinesis record. As far I see, event time on Kinesis record is > generated from ApproximateArrivalTimestamp field on getRecords API > call. According to Kinesis doc[1]: There are no guarantees about the > time stamp accuracy, or that the time stamp is always increasing. For > example, records in a shard or across a stream might have time stamps that > are out of order. > > So there is a chance to skip records if timestamps captured on > checkpoint were out of order. Also there is no provision to provide > per shard timestamp while initializing KinesisReader. > > [1] > https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdocs > .aws.amazon.com%2Fkinesis%2Flatest%2FAPIReference%2FAPI_GetRecords.htm > l&data=02%7C01%7CSunnyM%40DNB.com%7Cad634faa85b34571267c08d83235fb > 72%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637314554717493577& > ;sdata=2OgxvoDrOfz73pVlLHMJsr946nhhdjDA67Kt3jrSruE%3D&reserved=0 > > Regards, > > Mani > > *From:* Alexey Romanenko <[email protected]> > *Sent:* Tuesday, July 21, 2020 2:01 PM > *To:* [email protected] > *Subject:* Re: Unbounded sources unable to recover from checkpointMark > when withMaxReadTime() is used > > *CAUTION:*This email originated from outside of D&B. Please do not > click links or open attachments unless you recognize the sender and > know the content is safe. > > Hi Mani, > > Knowing when you last run was stopped (since you use batch mode), > could you leverage “withInitialPositionInStream()” or > “withInitialTimestampInStream()” for KinesisIO in this case? > > Alexey > > > > On 21 Jul 2020, at 13:40, Sunny, Mani Kolbe <[email protected] > <mailto:[email protected]>> wrote: > > Hi Max, > > Thank you for your reply. Our use case is to run a batch job against > a Kinesis source. Our downstream systems are still on batch mode. So > this application will read from the Kinesis source periodically and > generate batched outputs. Without ability to resume from a > checkpoint, it will be reading entire stream every time. > > Regards, > Mani > > -----Original Message----- > From: Maximilian Michels <[email protected] <mailto:[email protected]>> > Sent: Tuesday, July 21, 2020 11:38 AM > To:[email protected] <mailto:[email protected]> > Cc: Sunny, Mani Kolbe <[email protected] <mailto:[email protected]>> > Subject: Re: Unbounded sources unable to recover from checkpointMark > when withMaxReadTime() is used > > CAUTION: This email originated from outside of D&B. Please do not > click links or open attachments unless you recognize the sender and > know the content is safe. > > > Hi Mani, > > BoundedReadFromUnboundedSource was originally intended to be used in > batch pipelines. In batch, runners typically do not perform > checkpointing. In case of failures, they re-run the entire pipeline. > > Keep in mind that, even with checkpointing, reading for a finite > time in the processing time domain from an unbounded source rarely > gives consistent results across runs. > > However, ignoring the checkpoint looks problematic. We may want to > fail during checkpointing to prevent violating correctness (e.g. > exactly-once semantics). > > -Max > > On 21.07.20 11:36, Sunny, Mani Kolbe wrote: > > Observed on v2.22.0 > > When withMaxReadTime() is used, Beam creates a > BoundedReadFromUnboundedSource [1]. The ReadFn<T> class in > BoundedReadFromUnboundedSource which is responsible for reading > records from source. You can see this class doesnt verify if > there is > a recoverable checkpoint exist. Instead it always creates Reader > with > checkpointMark set as null [2]. > > Is this desired behavior? More importantly, do you guys recommend > using > withMaxReadTime() in production setup? Or is this more for demo > usecases? > > I have created a jira for the same (BEAM-104934 > <https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fiss > ues.apache.org > > <https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2Fues.apache.org%2F&data=02%7C01%7CSunnyM%40DNB.com%7Cad634faa85b34571267c08d83235fb72%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637314554717493577&sdata=1y1kuQEEig6rFIOEZ5Kkth8%2FukdknLOx2qXQnf5vmMA%3D&reserved=0>%2Fjira%2Fbrowse%2FBEAM-104934&data=02%7C01%7CSunnyM%40dnb.com > > <https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2F40dnb.com%2F&data=02%7C01%7CSunnyM%40DNB.com%7Cad634faa85b34571267c08d83235fb72%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637314554717493577&sdata=FLbco7zZjNcMO7%2BaecGe3RjEyYcaAahZY1SnKLdncHk%3D&reserved=0>%7Cc9e9f11464ac4242099508d82d6247e4%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637309247422805087&sdata=lrHfvqU6YzbZsE6Ks0ZRSgsOmbYFQim3sbgCJrpAQPA%3D&reserved=0>) > and looking to work on a patch for the same. But would like a > confirmation on the above first. > > Regards, > > Mani > > Reference: > > [1]https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fg > ithub.com > > <https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2Fithub.com%2F&data=02%7C01%7CSunnyM%40DNB.com%7Cad634faa85b34571267c08d83235fb72%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637314554717493577&sdata=8800UpWWbXUx636ybYK2JHDroRUnTAXFiNVnQIUSLkc%3D&reserved=0>%2Fapache%2Fbeam%2Fblob%2Fv2.22.0%2Fsdks%2Fjava%2Fcore%2Fsrc% > 2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FRead.java%23L205& > data=02%7C01%7CSunnyM%40dnb.com > > <https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2F40dnb.com%2F&data=02%7C01%7CSunnyM%40DNB.com%7Cad634faa85b34571267c08d83235fb72%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637314554717493577&sdata=FLbco7zZjNcMO7%2BaecGe3RjEyYcaAahZY1SnKLdncHk%3D&reserved=0>%7Cc9e9f11464ac4242099508d82d6247e4%7C1 > 9e2b708bf12437597198dec42771b3e%7C0%7C0%7C637309247422805087&sdata > > =dP3jW44zRDjNZSSEXM401CVF6926D9nt6wOCADbOW%2Bw%3D&reserved=0 > > [2]https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fg > ithub.com > > <https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2Fithub.com%2F&data=02%7C01%7CSunnyM%40DNB.com%7Cad634faa85b34571267c08d83235fb72%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637314554717503571&sdata=uIO0I%2FSd6ClkY5w%2BCIafoUKqg%2BF8NJBSkRoYB3oSGLI%3D&reserved=0>%2Fapache%2Fbeam%2Fblob%2Fv2.22.0%2Fsdks%2Fjava%2Fcore%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FBoundedReadFromUnboundedSource.java%23L193&data=02%7C01%7CSunnyM%40dnb.com > > <https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2F40dnb.com%2F&data=02%7C01%7CSunnyM%40DNB.com%7Cad634faa85b34571267c08d83235fb72%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637314554717503571&sdata=XdCx54kfVp6qygsRCmnqCTCueUeXAsMWVdV6CJPgGR0%3D&reserved=0>%7Cc9e9f11464ac4242099508d82d6247e4%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637309247422805087&sdata=YkyJLyoppmYkSr7%2BhYuW%2B5%2FL2XSSlNYm2CC1cKWlH64%3D&reserved=0 >
