Thank you Max. Is there a reference code somewhere to implement checkpointing 
for BoundedReadFromUnboundedSource ? I couldn’t quiet figure out where to get 
restored checkpointMark. It appears to be specific to runner implementations.

-----Original Message-----
From: Maximilian Michels <[email protected]> 
Sent: Monday, July 27, 2020 3:04 PM
To: [email protected]; Sunny, Mani Kolbe <[email protected]>
Subject: Re: Unbounded sources unable to recover from checkpointMark when 
withMaxReadTime() is used

CAUTION: This email originated from outside of D&B. Please do not click links 
or open attachments unless you recognize the sender and know the content is 
safe.


Hi Mani,

Just to let you know I think it would make sense to either

(1) implement checkpointing for BoundedReadFromUnboundedSource or
(2) throw an error in case of a provided checkpoint mark

Like you pointed out, ignoring it like we currently do, does not seem like a 
feasible solution.

-Max

On 21.07.20 16:16, Sunny, Mani Kolbe wrote:
> Hi Alexey,
>
> I did explore that route. The problem there is to identify what 
> timestamp to use.  While processing records, you can capture the 
> timestamp. This cannot be processing time, but event time on the 
> Kinesis record. As far I see, event time on Kinesis record is 
> generated from ApproximateArrivalTimestamp field on getRecords API 
> call. According to Kinesis doc[1]: There are no guarantees about the 
> time stamp accuracy, or that the time stamp is always increasing. For 
> example, records in a shard or across a stream might have time stamps that 
> are out of order.
>
> So there is a chance to skip records if timestamps captured on 
> checkpoint were out of order.  Also there is no provision to provide 
> per shard timestamp while initializing KinesisReader.
>
> [1]
> https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdocs
> .aws.amazon.com%2Fkinesis%2Flatest%2FAPIReference%2FAPI_GetRecords.htm
> l&amp;data=02%7C01%7CSunnyM%40DNB.com%7Cad634faa85b34571267c08d83235fb
> 72%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637314554717493577&amp
> ;sdata=2OgxvoDrOfz73pVlLHMJsr946nhhdjDA67Kt3jrSruE%3D&amp;reserved=0
>
> Regards,
>
> Mani
>
> *From:* Alexey Romanenko <[email protected]>
> *Sent:* Tuesday, July 21, 2020 2:01 PM
> *To:* [email protected]
> *Subject:* Re: Unbounded sources unable to recover from checkpointMark 
> when withMaxReadTime() is used
>
> *CAUTION:*This email originated from outside of D&B. Please do not 
> click links or open attachments unless you recognize the sender and 
> know the content is safe.
>
> Hi Mani,
>
> Knowing when you last run was stopped (since you use batch mode), 
> could you leverage “withInitialPositionInStream()” or 
> “withInitialTimestampInStream()” for KinesisIO in this case?
>
> Alexey
>
>
>
>     On 21 Jul 2020, at 13:40, Sunny, Mani Kolbe <[email protected]
>     <mailto:[email protected]>> wrote:
>
>     Hi Max,
>
>     Thank you for your reply. Our use case is to run a batch job against
>     a Kinesis source. Our downstream systems are still on batch mode. So
>     this application will read from the Kinesis source periodically and
>     generate batched outputs. Without ability to resume from a
>     checkpoint, it will be reading entire stream every time.
>
>     Regards,
>     Mani
>
>     -----Original Message-----
>     From: Maximilian Michels <[email protected] <mailto:[email protected]>>
>     Sent: Tuesday, July 21, 2020 11:38 AM
>     To:[email protected] <mailto:[email protected]>
>     Cc: Sunny, Mani Kolbe <[email protected] <mailto:[email protected]>>
>     Subject: Re: Unbounded sources unable to recover from checkpointMark
>     when withMaxReadTime() is used
>
>     CAUTION: This email originated from outside of D&B. Please do not
>     click links or open attachments unless you recognize the sender and
>     know the content is safe.
>
>
>     Hi Mani,
>
>     BoundedReadFromUnboundedSource was originally intended to be used in
>     batch pipelines. In batch, runners typically do not perform
>     checkpointing. In case of failures, they re-run the entire pipeline.
>
>     Keep in mind that, even with checkpointing, reading for a finite
>     time in the processing time domain from an unbounded source rarely
>     gives consistent results across runs.
>
>     However, ignoring the checkpoint looks problematic. We may want to
>     fail during checkpointing to prevent violating correctness (e.g.
>     exactly-once semantics).
>
>     -Max
>
>     On 21.07.20 11:36, Sunny, Mani Kolbe wrote:
>
>         Observed on v2.22.0
>
>         When withMaxReadTime() is used, Beam creates a
>         BoundedReadFromUnboundedSource [1].  The ReadFn<T> class in
>         BoundedReadFromUnboundedSource which is responsible for reading
>         records from source. You can see this class doesnt verify if
>         there is
>         a recoverable checkpoint exist. Instead it always creates Reader
>         with
>         checkpointMark set as null [2].
>
>         Is this desired behavior? More importantly, do you guys recommend
>         using
>         withMaxReadTime() in production setup? Or is this more for demo
>         usecases?
>
>         I have created a jira for the same (BEAM-104934
>         <https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fiss
>         ues.apache.org
>         
> <https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2Fues.apache.org%2F&amp;data=02%7C01%7CSunnyM%40DNB.com%7Cad634faa85b34571267c08d83235fb72%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637314554717493577&amp;sdata=1y1kuQEEig6rFIOEZ5Kkth8%2FukdknLOx2qXQnf5vmMA%3D&amp;reserved=0>%2Fjira%2Fbrowse%2FBEAM-104934&amp;data=02%7C01%7CSunnyM%40dnb.com
>         
> <https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2F40dnb.com%2F&amp;data=02%7C01%7CSunnyM%40DNB.com%7Cad634faa85b34571267c08d83235fb72%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637314554717493577&amp;sdata=FLbco7zZjNcMO7%2BaecGe3RjEyYcaAahZY1SnKLdncHk%3D&amp;reserved=0>%7Cc9e9f11464ac4242099508d82d6247e4%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637309247422805087&amp;sdata=lrHfvqU6YzbZsE6Ks0ZRSgsOmbYFQim3sbgCJrpAQPA%3D&amp;reserved=0>)
>         and looking to work on a patch for the same. But would like a
>         confirmation on the above first.
>
>         Regards,
>
>         Mani
>
>         Reference:
>
>         [1]https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fg
>         ithub.com
>         
> <https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2Fithub.com%2F&amp;data=02%7C01%7CSunnyM%40DNB.com%7Cad634faa85b34571267c08d83235fb72%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637314554717493577&amp;sdata=8800UpWWbXUx636ybYK2JHDroRUnTAXFiNVnQIUSLkc%3D&amp;reserved=0>%2Fapache%2Fbeam%2Fblob%2Fv2.22.0%2Fsdks%2Fjava%2Fcore%2Fsrc%
>         2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FRead.java%23L205&amp;
>         data=02%7C01%7CSunnyM%40dnb.com
>         
> <https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2F40dnb.com%2F&amp;data=02%7C01%7CSunnyM%40DNB.com%7Cad634faa85b34571267c08d83235fb72%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637314554717493577&amp;sdata=FLbco7zZjNcMO7%2BaecGe3RjEyYcaAahZY1SnKLdncHk%3D&amp;reserved=0>%7Cc9e9f11464ac4242099508d82d6247e4%7C1
>         9e2b708bf12437597198dec42771b3e%7C0%7C0%7C637309247422805087&amp;sdata
>         
> =dP3jW44zRDjNZSSEXM401CVF6926D9nt6wOCADbOW%2Bw%3D&amp;reserved=0
>
>         [2]https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fg
>         ithub.com
>         
> <https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2Fithub.com%2F&amp;data=02%7C01%7CSunnyM%40DNB.com%7Cad634faa85b34571267c08d83235fb72%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637314554717503571&amp;sdata=uIO0I%2FSd6ClkY5w%2BCIafoUKqg%2BF8NJBSkRoYB3oSGLI%3D&amp;reserved=0>%2Fapache%2Fbeam%2Fblob%2Fv2.22.0%2Fsdks%2Fjava%2Fcore%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FBoundedReadFromUnboundedSource.java%23L193&amp;data=02%7C01%7CSunnyM%40dnb.com
>         
> <https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2F40dnb.com%2F&amp;data=02%7C01%7CSunnyM%40DNB.com%7Cad634faa85b34571267c08d83235fb72%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637314554717503571&amp;sdata=XdCx54kfVp6qygsRCmnqCTCueUeXAsMWVdV6CJPgGR0%3D&amp;reserved=0>%7Cc9e9f11464ac4242099508d82d6247e4%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637309247422805087&amp;sdata=YkyJLyoppmYkSr7%2BhYuW%2B5%2FL2XSSlNYm2CC1cKWlH64%3D&amp;reserved=0
>

Reply via email to