Hi Alexey,

I did explore that route. The problem there is to identify what timestamp to 
use.  While processing records, you can capture the timestamp. This cannot be 
processing time, but event time on the Kinesis record. As far I see, event time 
on Kinesis record is generated from ApproximateArrivalTimestamp field on 
getRecords API call. According to Kinesis doc[1]: There are no guarantees about 
the time stamp accuracy, or that the time stamp is always increasing. For 
example, records in a shard or across a stream might have time stamps that are 
out of order.

So there is a chance to skip records if timestamps captured on checkpoint were 
out of order.  Also there is no provision to provide per shard timestamp while 
initializing KinesisReader.

[1] https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html

Regards,
Mani

From: Alexey Romanenko <[email protected]>
Sent: Tuesday, July 21, 2020 2:01 PM
To: [email protected]
Subject: Re: Unbounded sources unable to recover from checkpointMark when 
withMaxReadTime() is used

CAUTION: This email originated from outside of D&B. Please do not click links 
or open attachments unless you recognize the sender and know the content is 
safe.

Hi Mani,

Knowing when you last run was stopped (since you use batch mode), could you 
leverage “withInitialPositionInStream()” or “withInitialTimestampInStream()” 
for KinesisIO in this case?

Alexey


On 21 Jul 2020, at 13:40, Sunny, Mani Kolbe 
<[email protected]<mailto:[email protected]>> wrote:

Hi Max,

Thank you for your reply. Our use case is to run a batch job against a Kinesis 
source. Our downstream systems are still on batch mode. So this application 
will read from the Kinesis source periodically and generate batched outputs. 
Without ability to resume from a checkpoint, it will be reading entire stream 
every time.

Regards,
Mani

-----Original Message-----
From: Maximilian Michels <[email protected]<mailto:[email protected]>>
Sent: Tuesday, July 21, 2020 11:38 AM
To: [email protected]<mailto:[email protected]>
Cc: Sunny, Mani Kolbe <[email protected]<mailto:[email protected]>>
Subject: Re: Unbounded sources unable to recover from checkpointMark when 
withMaxReadTime() is used

CAUTION: This email originated from outside of D&B. Please do not click links 
or open attachments unless you recognize the sender and know the content is 
safe.


Hi Mani,

BoundedReadFromUnboundedSource was originally intended to be used in batch 
pipelines. In batch, runners typically do not perform checkpointing. In case of 
failures, they re-run the entire pipeline.

Keep in mind that, even with checkpointing, reading for a finite time in the 
processing time domain from an unbounded source rarely gives consistent results 
across runs.

However, ignoring the checkpoint looks problematic. We may want to fail during 
checkpointing to prevent violating correctness (e.g. exactly-once semantics).

-Max

On 21.07.20 11:36, Sunny, Mani Kolbe wrote:

Observed on v2.22.0

When withMaxReadTime() is used, Beam creates a
BoundedReadFromUnboundedSource [1].  The ReadFn<T> class in
BoundedReadFromUnboundedSource which is responsible for reading
records from source. You can see this class doesnt verify if there is
a recoverable checkpoint exist. Instead it always creates Reader with
checkpointMark set as null [2].

Is this desired behavior? More importantly, do you guys recommend
using
withMaxReadTime() in production setup? Or is this more for demo usecases?

I have created a jira for the same (BEAM-104934
<https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fiss
ues.apache.org<https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2Fues.apache.org%2F&data=02%7C01%7CSunnyM%40dnb.com%7Ce368fe78c733418a0a8908d82d7627f4%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637309332788163122&sdata=KChY36xii%2FraHclM0hdGwsKZ5J1xujbNv1EkuC0ESAQ%3D&reserved=0>%2Fjira%2Fbrowse%2FBEAM-104934&amp;data=02%7C01%7CSunnyM%40dnb.com<https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2F40dnb.com%2F&data=02%7C01%7CSunnyM%40dnb.com%7Ce368fe78c733418a0a8908d82d7627f4%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637309332788173119&sdata=BAguU1NKdtPQbGgzECnCiUAFM2czb3USeS47MIYwF8k%3D&reserved=0>%7Cc9e9f11464ac4242099508d82d6247e4%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637309247422805087&amp;sdata=lrHfvqU6YzbZsE6Ks0ZRSgsOmbYFQim3sbgCJrpAQPA%3D&amp;reserved=0>)
 and looking to work on a patch for the same. But would like a confirmation on 
the above first.

Regards,

Mani

Reference:

[1]https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fg
ithub.com<https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2Fithub.com%2F&data=02%7C01%7CSunnyM%40dnb.com%7Ce368fe78c733418a0a8908d82d7627f4%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637309332788173119&sdata=KqDiTeve0ZFZAGsLAyB5WXrd9VWfPSlLXRS6O457rxc%3D&reserved=0>%2Fapache%2Fbeam%2Fblob%2Fv2.22.0%2Fsdks%2Fjava%2Fcore%2Fsrc%
2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FRead.java%23L205&amp;
data=02%7C01%7CSunnyM%40dnb.com<https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2F40dnb.com%2F&data=02%7C01%7CSunnyM%40dnb.com%7Ce368fe78c733418a0a8908d82d7627f4%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637309332788173119&sdata=BAguU1NKdtPQbGgzECnCiUAFM2czb3USeS47MIYwF8k%3D&reserved=0>%7Cc9e9f11464ac4242099508d82d6247e4%7C1
9e2b708bf12437597198dec42771b3e%7C0%7C0%7C637309247422805087&amp;sdata
=dP3jW44zRDjNZSSEXM401CVF6926D9nt6wOCADbOW%2Bw%3D&amp;reserved=0

[2]https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fg
ithub.com<https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2Fithub.com%2F&data=02%7C01%7CSunnyM%40dnb.com%7Ce368fe78c733418a0a8908d82d7627f4%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637309332788183111&sdata=NIHvuFSg8bDcXhx13GoOwmlZIj0ZUiDYN8McJPRM%2BS0%3D&reserved=0>%2Fapache%2Fbeam%2Fblob%2Fv2.22.0%2Fsdks%2Fjava%2Fcore%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FBoundedReadFromUnboundedSource.java%23L193&amp;data=02%7C01%7CSunnyM%40dnb.com<https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2F40dnb.com%2F&data=02%7C01%7CSunnyM%40dnb.com%7Ce368fe78c733418a0a8908d82d7627f4%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637309332788183111&sdata=pEtBJhESgkVWqgLp0HjDW9cWcY7ptcva7lzdUrx5Xeg%3D&reserved=0>%7Cc9e9f11464ac4242099508d82d6247e4%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637309247422805087&amp;sdata=YkyJLyoppmYkSr7%2BhYuW%2B5%2FL2XSSlNYm2CC1cKWlH64%3D&amp;reserved=0

Reply via email to