Thanks it was working fine with: bin/flink run  -s
s3://bucketxx-app/flink/checkpoint/iv/f1ef373e27203907d2d568bca31e6556/chk-384/
\

On Thu, Apr 8, 2021 at 11:42 AM Vijayendra Yadav <contact....@gmail.com>
wrote:

> Hi Arvid,
>
> Thanks for your response. I did not restart from the checkpoint. I assumed
> Flink would look for a checkpoint upon restart automatically.
>
> *I should restart like below ?*
>
> bin/flink run  -s
> s3://bucketxx-app/flink/checkpoint/iv/f1ef373e27203907d2d568bca31e6556/chk-384/
> \
>
> Thanks,
> Vijay
>
> On Thu, Apr 8, 2021 at 12:52 AM Arvid Heise <ar...@apache.org> wrote:
>
>> Hi Vijay,
>>
>> edit: After re-reading your message: are you sure that you restart from a
>> checkpoint/savepoint? If you just start the application anew and use LATEST
>> initial position, this is the expected bahvior.
>>
>> --- original intended answer if you restart from checkpoint
>>
>> this is definitively not the expected behavior.
>>
>> To exclude certain error sources:
>> - Could you double-check if this is also happening if you don't use
>> unaligned checkpoints? (I don't really think this is because of unaligned
>> checkpoint, but it's better to be sure and we want to reduce the possible
>> error sources)
>> - Can you see the missing messages still in Kinesis?
>> - Could you extract all log INFO statements from
>> org.apache.flink.streaming.connectors.kinesis and attach them here?
>> - How long did you wait with recovery?
>>
>>
>>
>> On Wed, Apr 7, 2021 at 8:03 PM Vijayendra Yadav <contact....@gmail.com>
>> wrote:
>>
>>> Hi Team,
>>>
>>> We are trying to make sure we are not losing data when KINESIS Consumer
>>> is down.
>>>
>>> Kinesis streaming Job which has following checkpointing properties:
>>>
>>>
>>> *// checkpoint every X msecs
>>> env.enableCheckpointing(Conf.getFlinkCheckpointInterval());*
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *// enable externalized checkpoints which are retained after job
>>> cancellation
>>> env.getCheckpointConfig().enableExternalizedCheckpoints(
>>> CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
>>>   );// allow job recovery fallback to checkpoint when there is a more
>>> recent savepoint
>>> env.getCheckpointConfig().setPreferCheckpointForRecovery(Conf.isFlinkCheckpointPreferCheckPoint());
>>>  //
>>> enables the experimental unaligned checkpoints
>>> env.getCheckpointConfig().enableUnalignedCheckpoints();*
>>>
>>> *//checkpointpath*
>>> *        env.setStateBackend(new
>>> FsStateBackend(Conf.getFlinkCheckPointPath(), true));*
>>>
>>> 1) We killed the Kinesis Job
>>> 2) Sent messages to KDS while Consumer was down.
>>> 3) Restarted Flink Consumer, *messages which were sent during the
>>> Consumer down period, never ingested (data loss).*
>>> 4) Re-sent messages to KDS while the consumer was still up. Messages did
>>> ingest fine.
>>>
>>> *How can I avoid data loss for #3 ??*
>>>
>>> From Logs:
>>>
>>>
>>> *2021-04-07 12:15:49,161 INFO
>>>  org.apache.flink.runtime.jobmaster.JobMaster                  - Using
>>> application-defined state backend: File State Backend (checkpoints:
>>> 's3://bucket-xx/flink/checkpoint/iv', savepoints: 'null', asynchronous:
>>> TRUE, fileStateThreshold: -1)*
>>>
>>>
>>>
>>> *2021-04-07 12:16:02,343 INFO
>>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed
>>> checkpoint 1 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 591
>>> ms).2021-04-07 12:16:11,951 INFO
>>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
>>> checkpoint 2 (type=CHECKPOINT) @ 1617797771751 for job
>>> 8943d16e22b8aaf65d6b9e2b8bd54113.2021-04-07 12:16:12,483 INFO
>>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed
>>> checkpoint 2 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 411
>>> ms).*
>>>
>>> Thanks,
>>> Vijay
>>>
>>

Reply via email to