Thanks it was working fine with: bin/flink run -s s3://bucketxx-app/flink/checkpoint/iv/f1ef373e27203907d2d568bca31e6556/chk-384/ \
On Thu, Apr 8, 2021 at 11:42 AM Vijayendra Yadav <contact....@gmail.com> wrote: > Hi Arvid, > > Thanks for your response. I did not restart from the checkpoint. I assumed > Flink would look for a checkpoint upon restart automatically. > > *I should restart like below ?* > > bin/flink run -s > s3://bucketxx-app/flink/checkpoint/iv/f1ef373e27203907d2d568bca31e6556/chk-384/ > \ > > Thanks, > Vijay > > On Thu, Apr 8, 2021 at 12:52 AM Arvid Heise <ar...@apache.org> wrote: > >> Hi Vijay, >> >> edit: After re-reading your message: are you sure that you restart from a >> checkpoint/savepoint? If you just start the application anew and use LATEST >> initial position, this is the expected bahvior. >> >> --- original intended answer if you restart from checkpoint >> >> this is definitively not the expected behavior. >> >> To exclude certain error sources: >> - Could you double-check if this is also happening if you don't use >> unaligned checkpoints? (I don't really think this is because of unaligned >> checkpoint, but it's better to be sure and we want to reduce the possible >> error sources) >> - Can you see the missing messages still in Kinesis? >> - Could you extract all log INFO statements from >> org.apache.flink.streaming.connectors.kinesis and attach them here? >> - How long did you wait with recovery? >> >> >> >> On Wed, Apr 7, 2021 at 8:03 PM Vijayendra Yadav <contact....@gmail.com> >> wrote: >> >>> Hi Team, >>> >>> We are trying to make sure we are not losing data when KINESIS Consumer >>> is down. >>> >>> Kinesis streaming Job which has following checkpointing properties: >>> >>> >>> *// checkpoint every X msecs >>> env.enableCheckpointing(Conf.getFlinkCheckpointInterval());* >>> >>> >>> >>> >>> >>> >>> >>> >>> *// enable externalized checkpoints which are retained after job >>> cancellation >>> env.getCheckpointConfig().enableExternalizedCheckpoints( >>> CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION >>> );// allow job recovery fallback to checkpoint when there is a more >>> recent savepoint >>> env.getCheckpointConfig().setPreferCheckpointForRecovery(Conf.isFlinkCheckpointPreferCheckPoint()); >>> // >>> enables the experimental unaligned checkpoints >>> env.getCheckpointConfig().enableUnalignedCheckpoints();* >>> >>> *//checkpointpath* >>> * env.setStateBackend(new >>> FsStateBackend(Conf.getFlinkCheckPointPath(), true));* >>> >>> 1) We killed the Kinesis Job >>> 2) Sent messages to KDS while Consumer was down. >>> 3) Restarted Flink Consumer, *messages which were sent during the >>> Consumer down period, never ingested (data loss).* >>> 4) Re-sent messages to KDS while the consumer was still up. Messages did >>> ingest fine. >>> >>> *How can I avoid data loss for #3 ??* >>> >>> From Logs: >>> >>> >>> *2021-04-07 12:15:49,161 INFO >>> org.apache.flink.runtime.jobmaster.JobMaster - Using >>> application-defined state backend: File State Backend (checkpoints: >>> 's3://bucket-xx/flink/checkpoint/iv', savepoints: 'null', asynchronous: >>> TRUE, fileStateThreshold: -1)* >>> >>> >>> >>> *2021-04-07 12:16:02,343 INFO >>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed >>> checkpoint 1 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 591 >>> ms).2021-04-07 12:16:11,951 INFO >>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering >>> checkpoint 2 (type=CHECKPOINT) @ 1617797771751 for job >>> 8943d16e22b8aaf65d6b9e2b8bd54113.2021-04-07 12:16:12,483 INFO >>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed >>> checkpoint 2 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 411 >>> ms).* >>> >>> Thanks, >>> Vijay >>> >>