Hi Alexey, I did explore that route. The problem there is to identify what timestamp to use. While processing records, you can capture the timestamp. This cannot be processing time, but event time on the Kinesis record. As far I see, event time on Kinesis record is generated from ApproximateArrivalTimestamp field on getRecords API call. According to Kinesis doc[1]: There are no guarantees about the time stamp accuracy, or that the time stamp is always increasing. For example, records in a shard or across a stream might have time stamps that are out of order.
So there is a chance to skip records if timestamps captured on checkpoint were out of order. Also there is no provision to provide per shard timestamp while initializing KinesisReader. [1] https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html Regards, Mani From: Alexey Romanenko <[email protected]> Sent: Tuesday, July 21, 2020 2:01 PM To: [email protected] Subject: Re: Unbounded sources unable to recover from checkpointMark when withMaxReadTime() is used CAUTION: This email originated from outside of D&B. Please do not click links or open attachments unless you recognize the sender and know the content is safe. Hi Mani, Knowing when you last run was stopped (since you use batch mode), could you leverage “withInitialPositionInStream()” or “withInitialTimestampInStream()” for KinesisIO in this case? Alexey On 21 Jul 2020, at 13:40, Sunny, Mani Kolbe <[email protected]<mailto:[email protected]>> wrote: Hi Max, Thank you for your reply. Our use case is to run a batch job against a Kinesis source. Our downstream systems are still on batch mode. So this application will read from the Kinesis source periodically and generate batched outputs. Without ability to resume from a checkpoint, it will be reading entire stream every time. Regards, Mani -----Original Message----- From: Maximilian Michels <[email protected]<mailto:[email protected]>> Sent: Tuesday, July 21, 2020 11:38 AM To: [email protected]<mailto:[email protected]> Cc: Sunny, Mani Kolbe <[email protected]<mailto:[email protected]>> Subject: Re: Unbounded sources unable to recover from checkpointMark when withMaxReadTime() is used CAUTION: This email originated from outside of D&B. Please do not click links or open attachments unless you recognize the sender and know the content is safe. Hi Mani, BoundedReadFromUnboundedSource was originally intended to be used in batch pipelines. In batch, runners typically do not perform checkpointing. In case of failures, they re-run the entire pipeline. Keep in mind that, even with checkpointing, reading for a finite time in the processing time domain from an unbounded source rarely gives consistent results across runs. However, ignoring the checkpoint looks problematic. We may want to fail during checkpointing to prevent violating correctness (e.g. exactly-once semantics). -Max On 21.07.20 11:36, Sunny, Mani Kolbe wrote: Observed on v2.22.0 When withMaxReadTime() is used, Beam creates a BoundedReadFromUnboundedSource [1]. The ReadFn<T> class in BoundedReadFromUnboundedSource which is responsible for reading records from source. You can see this class doesnt verify if there is a recoverable checkpoint exist. Instead it always creates Reader with checkpointMark set as null [2]. Is this desired behavior? More importantly, do you guys recommend using withMaxReadTime() in production setup? Or is this more for demo usecases? I have created a jira for the same (BEAM-104934 <https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fiss ues.apache.org<https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2Fues.apache.org%2F&data=02%7C01%7CSunnyM%40dnb.com%7Ce368fe78c733418a0a8908d82d7627f4%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637309332788163122&sdata=KChY36xii%2FraHclM0hdGwsKZ5J1xujbNv1EkuC0ESAQ%3D&reserved=0>%2Fjira%2Fbrowse%2FBEAM-104934&data=02%7C01%7CSunnyM%40dnb.com<https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2F40dnb.com%2F&data=02%7C01%7CSunnyM%40dnb.com%7Ce368fe78c733418a0a8908d82d7627f4%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637309332788173119&sdata=BAguU1NKdtPQbGgzECnCiUAFM2czb3USeS47MIYwF8k%3D&reserved=0>%7Cc9e9f11464ac4242099508d82d6247e4%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637309247422805087&sdata=lrHfvqU6YzbZsE6Ks0ZRSgsOmbYFQim3sbgCJrpAQPA%3D&reserved=0>) and looking to work on a patch for the same. But would like a confirmation on the above first. Regards, Mani Reference: [1]https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fg ithub.com<https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2Fithub.com%2F&data=02%7C01%7CSunnyM%40dnb.com%7Ce368fe78c733418a0a8908d82d7627f4%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637309332788173119&sdata=KqDiTeve0ZFZAGsLAyB5WXrd9VWfPSlLXRS6O457rxc%3D&reserved=0>%2Fapache%2Fbeam%2Fblob%2Fv2.22.0%2Fsdks%2Fjava%2Fcore%2Fsrc% 2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FRead.java%23L205& data=02%7C01%7CSunnyM%40dnb.com<https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2F40dnb.com%2F&data=02%7C01%7CSunnyM%40dnb.com%7Ce368fe78c733418a0a8908d82d7627f4%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637309332788173119&sdata=BAguU1NKdtPQbGgzECnCiUAFM2czb3USeS47MIYwF8k%3D&reserved=0>%7Cc9e9f11464ac4242099508d82d6247e4%7C1 9e2b708bf12437597198dec42771b3e%7C0%7C0%7C637309247422805087&sdata =dP3jW44zRDjNZSSEXM401CVF6926D9nt6wOCADbOW%2Bw%3D&reserved=0 [2]https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fg ithub.com<https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2Fithub.com%2F&data=02%7C01%7CSunnyM%40dnb.com%7Ce368fe78c733418a0a8908d82d7627f4%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637309332788183111&sdata=NIHvuFSg8bDcXhx13GoOwmlZIj0ZUiDYN8McJPRM%2BS0%3D&reserved=0>%2Fapache%2Fbeam%2Fblob%2Fv2.22.0%2Fsdks%2Fjava%2Fcore%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FBoundedReadFromUnboundedSource.java%23L193&data=02%7C01%7CSunnyM%40dnb.com<https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2F40dnb.com%2F&data=02%7C01%7CSunnyM%40dnb.com%7Ce368fe78c733418a0a8908d82d7627f4%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637309332788183111&sdata=pEtBJhESgkVWqgLp0HjDW9cWcY7ptcva7lzdUrx5Xeg%3D&reserved=0>%7Cc9e9f11464ac4242099508d82d6247e4%7C19e2b708bf12437597198dec42771b3e%7C0%7C0%7C637309247422805087&sdata=YkyJLyoppmYkSr7%2BhYuW%2B5%2FL2XSSlNYm2CC1cKWlH64%3D&reserved=0
