Hi Mani,

The implementation of BoundedReadFromUnboundedSource currently doesn't allow that in the same way sources checkpoint. We would either have to convert it into a proper source (it's a DoFn atm), or store the checkpoint mark in Beam managed state (which will be checkpointed). The latter is probably easier.

-Max

On 27.07.20 21:43, Sunny, Mani Kolbe wrote:
Thank you Max. Is there a reference code somewhere to implement checkpointing 
for BoundedReadFromUnboundedSource ? I couldn’t quiet figure out where to get 
restored checkpointMark. It appears to be specific to runner implementations.

-----Original Message-----
From: Maximilian Michels <[email protected]>
Sent: Monday, July 27, 2020 3:04 PM
To: [email protected]; Sunny, Mani Kolbe <[email protected]>
Subject: Re: Unbounded sources unable to recover from checkpointMark when 
withMaxReadTime() is used

CAUTION: This email originated from outside of D&B. Please do not click links 
or open attachments unless you recognize the sender and know the content is safe.


Hi Mani,

Just to let you know I think it would make sense to either

(1) implement checkpointing for BoundedReadFromUnboundedSource or
(2) throw an error in case of a provided checkpoint mark

Like you pointed out, ignoring it like we currently do, does not seem like a 
feasible solution.

-Max

On 21.07.20 16:16, Sunny, Mani Kolbe wrote:
Hi Alexey,

I did explore that route. The problem there is to identify what
timestamp to use.  While processing records, you can capture the
timestamp. This cannot be processing time, but event time on the
Kinesis record. As far I see, event time on Kinesis record is
generated from ApproximateArrivalTimestamp field on getRecords API
call. According to Kinesis doc[1]: There are no guarantees about the
time stamp accuracy, or that the time stamp is always increasing. For
example, records in a shard or across a stream might have time stamps that are 
out of order.

So there is a chance to skip records if timestamps captured on
checkpoint were out of order.  Also there is no provision to provide
per shard timestamp while initializing KinesisReader.

[1]
https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdocs
.aws.amazon.com%2Fkinesis%2Flatest%2FAPIReference%2FAPI_GetRecords.htm
l&amp;data=02%7C01%7CSunnyM%40DNB.com%7Cad634faa85b34571267c08d83235fb
72%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637314554717493577&amp
;sdata=2OgxvoDrOfz73pVlLHMJsr946nhhdjDA67Kt3jrSruE%3D&amp;reserved=0

Regards,

Mani

*From:* Alexey Romanenko <[email protected]>
*Sent:* Tuesday, July 21, 2020 2:01 PM
*To:* [email protected]
*Subject:* Re: Unbounded sources unable to recover from checkpointMark
when withMaxReadTime() is used

*CAUTION:*This email originated from outside of D&B. Please do not
click links or open attachments unless you recognize the sender and
know the content is safe.

Hi Mani,

Knowing when you last run was stopped (since you use batch mode),
could you leverage “withInitialPositionInStream()” or
“withInitialTimestampInStream()” for KinesisIO in this case?

Alexey



     On 21 Jul 2020, at 13:40, Sunny, Mani Kolbe <[email protected]
     <mailto:[email protected]>> wrote:

     Hi Max,

     Thank you for your reply. Our use case is to run a batch job against
     a Kinesis source. Our downstream systems are still on batch mode. So
     this application will read from the Kinesis source periodically and
     generate batched outputs. Without ability to resume from a
     checkpoint, it will be reading entire stream every time.

     Regards,
     Mani

     -----Original Message-----
     From: Maximilian Michels <[email protected] <mailto:[email protected]>>
     Sent: Tuesday, July 21, 2020 11:38 AM
     To:[email protected] <mailto:[email protected]>
     Cc: Sunny, Mani Kolbe <[email protected] <mailto:[email protected]>>
     Subject: Re: Unbounded sources unable to recover from checkpointMark
     when withMaxReadTime() is used

     CAUTION: This email originated from outside of D&B. Please do not
     click links or open attachments unless you recognize the sender and
     know the content is safe.


     Hi Mani,

     BoundedReadFromUnboundedSource was originally intended to be used in
     batch pipelines. In batch, runners typically do not perform
     checkpointing. In case of failures, they re-run the entire pipeline.

     Keep in mind that, even with checkpointing, reading for a finite
     time in the processing time domain from an unbounded source rarely
     gives consistent results across runs.

     However, ignoring the checkpoint looks problematic. We may want to
     fail during checkpointing to prevent violating correctness (e.g.
     exactly-once semantics).

     -Max

     On 21.07.20 11:36, Sunny, Mani Kolbe wrote:

         Observed on v2.22.0

         When withMaxReadTime() is used, Beam creates a
         BoundedReadFromUnboundedSource [1].  The ReadFn<T> class in
         BoundedReadFromUnboundedSource which is responsible for reading
         records from source. You can see this class doesnt verify if
         there is
         a recoverable checkpoint exist. Instead it always creates Reader
         with
         checkpointMark set as null [2].

         Is this desired behavior? More importantly, do you guys recommend
         using
         withMaxReadTime() in production setup? Or is this more for demo
         usecases?

         I have created a jira for the same (BEAM-104934
         <https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fiss
         ues.apache.org
         
<https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2Fues.apache.org%2F&amp;data=02%7C01%7CSunnyM%40DNB.com%7Cad634faa85b34571267c08d83235fb72%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637314554717493577&amp;sdata=1y1kuQEEig6rFIOEZ5Kkth8%2FukdknLOx2qXQnf5vmMA%3D&amp;reserved=0>%2Fjira%2Fbrowse%2FBEAM-104934&amp;data=02%7C01%7CSunnyM%40dnb.com
         
<https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2F40dnb.com%2F&amp;data=02%7C01%7CSunnyM%40DNB.com%7Cad634faa85b34571267c08d83235fb72%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637314554717493577&amp;sdata=FLbco7zZjNcMO7%2BaecGe3RjEyYcaAahZY1SnKLdncHk%3D&amp;reserved=0>%7Cc9e9f11464ac4242099508d82d6247e4%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637309247422805087&amp;sdata=lrHfvqU6YzbZsE6Ks0ZRSgsOmbYFQim3sbgCJrpAQPA%3D&amp;reserved=0>)
         and looking to work on a patch for the same. But would like a
         confirmation on the above first.

         Regards,

         Mani

         Reference:

         [1]https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fg
         ithub.com
         
<https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2Fithub.com%2F&amp;data=02%7C01%7CSunnyM%40DNB.com%7Cad634faa85b34571267c08d83235fb72%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637314554717493577&amp;sdata=8800UpWWbXUx636ybYK2JHDroRUnTAXFiNVnQIUSLkc%3D&amp;reserved=0>%2Fapache%2Fbeam%2Fblob%2Fv2.22.0%2Fsdks%2Fjava%2Fcore%2Fsrc%
         2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FRead.java%23L205&amp;
         data=02%7C01%7CSunnyM%40dnb.com
         
<https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2F40dnb.com%2F&amp;data=02%7C01%7CSunnyM%40DNB.com%7Cad634faa85b34571267c08d83235fb72%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637314554717493577&amp;sdata=FLbco7zZjNcMO7%2BaecGe3RjEyYcaAahZY1SnKLdncHk%3D&amp;reserved=0>%7Cc9e9f11464ac4242099508d82d6247e4%7C1
         9e2b708bf12437597198dec42771b3e%7C0%7C0%7C637309247422805087&amp;sdata
=dP3jW44zRDjNZSSEXM401CVF6926D9nt6wOCADbOW%2Bw%3D&amp;reserved=0

         [2]https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fg
         ithub.com
         
<https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2Fithub.com%2F&amp;data=02%7C01%7CSunnyM%40DNB.com%7Cad634faa85b34571267c08d83235fb72%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637314554717503571&amp;sdata=uIO0I%2FSd6ClkY5w%2BCIafoUKqg%2BF8NJBSkRoYB3oSGLI%3D&amp;reserved=0>%2Fapache%2Fbeam%2Fblob%2Fv2.22.0%2Fsdks%2Fjava%2Fcore%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FBoundedReadFromUnboundedSource.java%23L193&amp;data=02%7C01%7CSunnyM%40dnb.com
<https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2F40dnb.com%2F&amp;data=02%7C01%7CSunnyM%40DNB.com%7Cad634faa85b34571267c08d83235fb72%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637314554717503571&amp;sdata=XdCx54kfVp6qygsRCmnqCTCueUeXAsMWVdV6CJPgGR0%3D&amp;reserved=0>%7Cc9e9f11464ac4242099508d82d6247e4%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637309247422805087&amp;sdata=YkyJLyoppmYkSr7%2BhYuW%2B5%2FL2XSSlNYm2CC1cKWlH64%3D&amp;reserved=0

Reply via email to