Hi Mani,

Knowing when you last run was stopped (since you use batch mode), could you 
leverage “withInitialPositionInStream()” or “withInitialTimestampInStream()” 
for KinesisIO in this case?

Alexey

> On 21 Jul 2020, at 13:40, Sunny, Mani Kolbe <[email protected]> wrote:
> 
> Hi Max,
> 
> Thank you for your reply. Our use case is to run a batch job against a 
> Kinesis source. Our downstream systems are still on batch mode. So this 
> application will read from the Kinesis source periodically and generate 
> batched outputs. Without ability to resume from a checkpoint, it will be 
> reading entire stream every time. 
> 
> Regards,
> Mani
> 
> -----Original Message-----
> From: Maximilian Michels <[email protected] <mailto:[email protected]>> 
> Sent: Tuesday, July 21, 2020 11:38 AM
> To: [email protected] <mailto:[email protected]>
> Cc: Sunny, Mani Kolbe <[email protected] <mailto:[email protected]>>
> Subject: Re: Unbounded sources unable to recover from checkpointMark when 
> withMaxReadTime() is used
> 
> CAUTION: This email originated from outside of D&B. Please do not click links 
> or open attachments unless you recognize the sender and know the content is 
> safe.
> 
> 
> Hi Mani,
> 
> BoundedReadFromUnboundedSource was originally intended to be used in batch 
> pipelines. In batch, runners typically do not perform checkpointing. In case 
> of failures, they re-run the entire pipeline.
> 
> Keep in mind that, even with checkpointing, reading for a finite time in the 
> processing time domain from an unbounded source rarely gives consistent 
> results across runs.
> 
> However, ignoring the checkpoint looks problematic. We may want to fail 
> during checkpointing to prevent violating correctness (e.g. exactly-once 
> semantics).
> 
> -Max
> 
> On 21.07.20 11:36, Sunny, Mani Kolbe wrote:
>> Observed on v2.22.0
>> 
>> When withMaxReadTime() is used, Beam creates a 
>> BoundedReadFromUnboundedSource [1].  The ReadFn<T> class in 
>> BoundedReadFromUnboundedSource which is responsible for reading 
>> records from source. You can see this class doesnt verify if there is 
>> a recoverable checkpoint exist. Instead it always creates Reader with 
>> checkpointMark set as null [2].
>> 
>> Is this desired behavior? More importantly, do you guys recommend 
>> using
>> withMaxReadTime() in production setup? Or is this more for demo usecases?
>> 
>> I have created a jira for the same (BEAM-104934
>> <https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fiss 
>> <https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fiss>
>> ues.apache.org 
>> <http://ues.apache.org/>%2Fjira%2Fbrowse%2FBEAM-104934&amp;data=02%7C01%7CSunnyM%40dnb.com
>>  
>> <http://40dnb.com/>%7Cc9e9f11464ac4242099508d82d6247e4%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637309247422805087&amp;sdata=lrHfvqU6YzbZsE6Ks0ZRSgsOmbYFQim3sbgCJrpAQPA%3D&amp;reserved=0>)
>>  and looking to work on a patch for the same. But would like a confirmation 
>> on the above first.
>> 
>> Regards,
>> 
>> Mani
>> 
>> Reference:
>> 
>> [1]https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fg 
>> <https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fg>
>> ithub.com 
>> <http://ithub.com/>%2Fapache%2Fbeam%2Fblob%2Fv2.22.0%2Fsdks%2Fjava%2Fcore%2Fsrc%
>> 2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FRead.java%23L205&amp;
>> data=02%7C01%7CSunnyM%40dnb.com 
>> <http://40dnb.com/>%7Cc9e9f11464ac4242099508d82d6247e4%7C1
>> 9e2b708bf12437597198dec42771b3e%7C0%7C0%7C637309247422805087&amp;sdata
>> =dP3jW44zRDjNZSSEXM401CVF6926D9nt6wOCADbOW%2Bw%3D&amp;reserved=0
>> 
>> [2]https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fg 
>> <https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fg>
>> ithub.com 
>> <http://ithub.com/>%2Fapache%2Fbeam%2Fblob%2Fv2.22.0%2Fsdks%2Fjava%2Fcore%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FBoundedReadFromUnboundedSource.java%23L193&amp;data=02%7C01%7CSunnyM%40dnb.com
>>  
>> <http://40dnb.com/>%7Cc9e9f11464ac4242099508d82d6247e4%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637309247422805087&amp;sdata=YkyJLyoppmYkSr7%2BhYuW%2B5%2FL2XSSlNYm2CC1cKWlH64%3D&amp;reserved=0

Reply via email to