Re: FLINK Kinesis consumer Checkpointing data loss

2021-04-09 Thread Vijayendra Yadav
Thank You it helped.


> On Apr 8, 2021, at 10:53 PM, Arvid Heise  wrote:
> 
> 
> Hi Vijay,
> 
> if you don't specify a checkpoint, then Flink assumes you want to start from 
> scratch (e.g., you had a bug in your business logic and need to start 
> completely without state).
> 
> If there is any failure and Flink restarts automatically, it will always pick 
> up from the latest checkpoint [1].
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/stateful-stream-processing.html#recovery
> 
>> On Thu, Apr 8, 2021 at 11:08 PM Vijayendra Yadav  
>> wrote:
>> Thanks it was working fine with: bin/flink run  -s 
>> s3://bucketxx-app/flink/checkpoint/iv/f1ef373e27203907d2d568bca31e6556/chk-384/
>>  \
>> 
>>> On Thu, Apr 8, 2021 at 11:42 AM Vijayendra Yadav  
>>> wrote:
>>> Hi Arvid,
>>> 
>>> Thanks for your response. I did not restart from the checkpoint. I assumed 
>>> Flink would look for a checkpoint upon restart automatically. 
>>> 
>>> I should restart like below ?
>>> 
>>> bin/flink run  -s 
>>> s3://bucketxx-app/flink/checkpoint/iv/f1ef373e27203907d2d568bca31e6556/chk-384/
>>>  \
>>> 
>>> Thanks,
>>> Vijay
>>> 
 On Thu, Apr 8, 2021 at 12:52 AM Arvid Heise  wrote:
 Hi Vijay,
 
 edit: After re-reading your message: are you sure that you restart from a 
 checkpoint/savepoint? If you just start the application anew and use 
 LATEST initial position, this is the expected bahvior.
 
 --- original intended answer if you restart from checkpoint
 
 this is definitively not the expected behavior.
 
 To exclude certain error sources:
 - Could you double-check if this is also happening if you don't use 
 unaligned checkpoints? (I don't really think this is because of unaligned 
 checkpoint, but it's better to be sure and we want to reduce the possible 
 error sources)
 - Can you see the missing messages still in Kinesis?
 - Could you extract all log INFO statements from 
 org.apache.flink.streaming.connectors.kinesis and attach them here?
 - How long did you wait with recovery?
 
 
 
> On Wed, Apr 7, 2021 at 8:03 PM Vijayendra Yadav  
> wrote:
> Hi Team,
> 
> We are trying to make sure we are not losing data when KINESIS Consumer 
> is down.
> 
> Kinesis streaming Job which has following checkpointing properties:
> 
> // checkpoint every X msecs
> env.enableCheckpointing(Conf.getFlinkCheckpointInterval());
> // enable externalized checkpoints which are retained after job 
> cancellation
> env.getCheckpointConfig().enableExternalizedCheckpoints(
> 
> CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
> );
> // allow job recovery fallback to checkpoint when there is a more recent 
> savepoint
>
> env.getCheckpointConfig().setPreferCheckpointForRecovery(Conf.isFlinkCheckpointPreferCheckPoint());
>  // enables the experimental unaligned checkpoints
> env.getCheckpointConfig().enableUnalignedCheckpoints();
> //checkpointpath
> env.setStateBackend(new 
> FsStateBackend(Conf.getFlinkCheckPointPath(), true));
> 
> 1) We killed the Kinesis Job
> 2) Sent messages to KDS while Consumer was down.
> 3) Restarted Flink Consumer, messages which were sent during the Consumer 
> down period, never ingested (data loss).
> 4) Re-sent messages to KDS while the consumer was still up. Messages did 
> ingest fine.
> 
> How can I avoid data loss for #3 ??
> 
> From Logs:
> 
> 2021-04-07 12:15:49,161 INFO  
> org.apache.flink.runtime.jobmaster.JobMaster  - Using 
> application-defined state backend: File State Backend (checkpoints: 
> 's3://bucket-xx/flink/checkpoint/iv', savepoints: 'null', asynchronous: 
> TRUE, fileStateThreshold: -1)
> 
> 2021-04-07 12:16:02,343 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
> checkpoint 1 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 591 
> ms).
> 2021-04-07 12:16:11,951 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
> Triggering checkpoint 2 (type=CHECKPOINT) @ 1617797771751 for job 
> 8943d16e22b8aaf65d6b9e2b8bd54113.
> 2021-04-07 12:16:12,483 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed 
> checkpoint 2 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 411 
> ms).
> 
> Thanks,
> Vijay


Re: FLINK Kinesis consumer Checkpointing data loss

2021-04-08 Thread Arvid Heise
Hi Vijay,

if you don't specify a checkpoint, then Flink assumes you want to start
from scratch (e.g., you had a bug in your business logic and need to start
completely without state).

If there is any failure and Flink restarts automatically, it will always
pick up from the latest checkpoint [1].

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/stateful-stream-processing.html#recovery

On Thu, Apr 8, 2021 at 11:08 PM Vijayendra Yadav 
wrote:

> Thanks it was working fine with: bin/flink run  -s
> s3://bucketxx-app/flink/checkpoint/iv/f1ef373e27203907d2d568bca31e6556/chk-384/
> \
>
> On Thu, Apr 8, 2021 at 11:42 AM Vijayendra Yadav 
> wrote:
>
>> Hi Arvid,
>>
>> Thanks for your response. I did not restart from the checkpoint. I
>> assumed Flink would look for a checkpoint upon restart automatically.
>>
>> *I should restart like below ?*
>>
>> bin/flink run  -s
>> s3://bucketxx-app/flink/checkpoint/iv/f1ef373e27203907d2d568bca31e6556/chk-384/
>> \
>>
>> Thanks,
>> Vijay
>>
>> On Thu, Apr 8, 2021 at 12:52 AM Arvid Heise  wrote:
>>
>>> Hi Vijay,
>>>
>>> edit: After re-reading your message: are you sure that you restart from
>>> a checkpoint/savepoint? If you just start the application anew and use
>>> LATEST initial position, this is the expected bahvior.
>>>
>>> --- original intended answer if you restart from checkpoint
>>>
>>> this is definitively not the expected behavior.
>>>
>>> To exclude certain error sources:
>>> - Could you double-check if this is also happening if you don't use
>>> unaligned checkpoints? (I don't really think this is because of unaligned
>>> checkpoint, but it's better to be sure and we want to reduce the possible
>>> error sources)
>>> - Can you see the missing messages still in Kinesis?
>>> - Could you extract all log INFO statements from
>>> org.apache.flink.streaming.connectors.kinesis and attach them here?
>>> - How long did you wait with recovery?
>>>
>>>
>>>
>>> On Wed, Apr 7, 2021 at 8:03 PM Vijayendra Yadav 
>>> wrote:
>>>
 Hi Team,

 We are trying to make sure we are not losing data when KINESIS Consumer
 is down.

 Kinesis streaming Job which has following checkpointing properties:


 *// checkpoint every X msecs
 env.enableCheckpointing(Conf.getFlinkCheckpointInterval());*








 *// enable externalized checkpoints which are retained after job
 cancellation
 env.getCheckpointConfig().enableExternalizedCheckpoints(
 CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
   );// allow job recovery fallback to checkpoint when there is a more
 recent savepoint
 env.getCheckpointConfig().setPreferCheckpointForRecovery(Conf.isFlinkCheckpointPreferCheckPoint());
  //
 enables the experimental unaligned checkpoints
 env.getCheckpointConfig().enableUnalignedCheckpoints();*

 *//checkpointpath*
 *env.setStateBackend(new
 FsStateBackend(Conf.getFlinkCheckPointPath(), true));*

 1) We killed the Kinesis Job
 2) Sent messages to KDS while Consumer was down.
 3) Restarted Flink Consumer, *messages which were sent during the
 Consumer down period, never ingested (data loss).*
 4) Re-sent messages to KDS while the consumer was still up. Messages
 did ingest fine.

 *How can I avoid data loss for #3 ??*

 From Logs:


 *2021-04-07 12:15:49,161 INFO
  org.apache.flink.runtime.jobmaster.JobMaster  - Using
 application-defined state backend: File State Backend (checkpoints:
 's3://bucket-xx/flink/checkpoint/iv', savepoints: 'null', asynchronous:
 TRUE, fileStateThreshold: -1)*



 *2021-04-07 12:16:02,343 INFO
  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
 checkpoint 1 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 591
 ms).2021-04-07 12:16:11,951 INFO
  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
 checkpoint 2 (type=CHECKPOINT) @ 1617797771751 for job
 8943d16e22b8aaf65d6b9e2b8bd54113.2021-04-07 12:16:12,483 INFO
  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
 checkpoint 2 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 411
 ms).*

 Thanks,
 Vijay

>>>


Re: FLINK Kinesis consumer Checkpointing data loss

2021-04-08 Thread Vijayendra Yadav
Thanks it was working fine with: bin/flink run  -s
s3://bucketxx-app/flink/checkpoint/iv/f1ef373e27203907d2d568bca31e6556/chk-384/
\

On Thu, Apr 8, 2021 at 11:42 AM Vijayendra Yadav 
wrote:

> Hi Arvid,
>
> Thanks for your response. I did not restart from the checkpoint. I assumed
> Flink would look for a checkpoint upon restart automatically.
>
> *I should restart like below ?*
>
> bin/flink run  -s
> s3://bucketxx-app/flink/checkpoint/iv/f1ef373e27203907d2d568bca31e6556/chk-384/
> \
>
> Thanks,
> Vijay
>
> On Thu, Apr 8, 2021 at 12:52 AM Arvid Heise  wrote:
>
>> Hi Vijay,
>>
>> edit: After re-reading your message: are you sure that you restart from a
>> checkpoint/savepoint? If you just start the application anew and use LATEST
>> initial position, this is the expected bahvior.
>>
>> --- original intended answer if you restart from checkpoint
>>
>> this is definitively not the expected behavior.
>>
>> To exclude certain error sources:
>> - Could you double-check if this is also happening if you don't use
>> unaligned checkpoints? (I don't really think this is because of unaligned
>> checkpoint, but it's better to be sure and we want to reduce the possible
>> error sources)
>> - Can you see the missing messages still in Kinesis?
>> - Could you extract all log INFO statements from
>> org.apache.flink.streaming.connectors.kinesis and attach them here?
>> - How long did you wait with recovery?
>>
>>
>>
>> On Wed, Apr 7, 2021 at 8:03 PM Vijayendra Yadav 
>> wrote:
>>
>>> Hi Team,
>>>
>>> We are trying to make sure we are not losing data when KINESIS Consumer
>>> is down.
>>>
>>> Kinesis streaming Job which has following checkpointing properties:
>>>
>>>
>>> *// checkpoint every X msecs
>>> env.enableCheckpointing(Conf.getFlinkCheckpointInterval());*
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *// enable externalized checkpoints which are retained after job
>>> cancellation
>>> env.getCheckpointConfig().enableExternalizedCheckpoints(
>>> CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
>>>   );// allow job recovery fallback to checkpoint when there is a more
>>> recent savepoint
>>> env.getCheckpointConfig().setPreferCheckpointForRecovery(Conf.isFlinkCheckpointPreferCheckPoint());
>>>  //
>>> enables the experimental unaligned checkpoints
>>> env.getCheckpointConfig().enableUnalignedCheckpoints();*
>>>
>>> *//checkpointpath*
>>> *env.setStateBackend(new
>>> FsStateBackend(Conf.getFlinkCheckPointPath(), true));*
>>>
>>> 1) We killed the Kinesis Job
>>> 2) Sent messages to KDS while Consumer was down.
>>> 3) Restarted Flink Consumer, *messages which were sent during the
>>> Consumer down period, never ingested (data loss).*
>>> 4) Re-sent messages to KDS while the consumer was still up. Messages did
>>> ingest fine.
>>>
>>> *How can I avoid data loss for #3 ??*
>>>
>>> From Logs:
>>>
>>>
>>> *2021-04-07 12:15:49,161 INFO
>>>  org.apache.flink.runtime.jobmaster.JobMaster  - Using
>>> application-defined state backend: File State Backend (checkpoints:
>>> 's3://bucket-xx/flink/checkpoint/iv', savepoints: 'null', asynchronous:
>>> TRUE, fileStateThreshold: -1)*
>>>
>>>
>>>
>>> *2021-04-07 12:16:02,343 INFO
>>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
>>> checkpoint 1 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 591
>>> ms).2021-04-07 12:16:11,951 INFO
>>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
>>> checkpoint 2 (type=CHECKPOINT) @ 1617797771751 for job
>>> 8943d16e22b8aaf65d6b9e2b8bd54113.2021-04-07 12:16:12,483 INFO
>>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
>>> checkpoint 2 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 411
>>> ms).*
>>>
>>> Thanks,
>>> Vijay
>>>
>>


Re: FLINK Kinesis consumer Checkpointing data loss

2021-04-08 Thread Vijayendra Yadav
Hi Arvid,

Thanks for your response. I did not restart from the checkpoint. I assumed
Flink would look for a checkpoint upon restart automatically.

*I should restart like below ?*

bin/flink run  -s
s3://bucketxx-app/flink/checkpoint/iv/f1ef373e27203907d2d568bca31e6556/chk-384/
\

Thanks,
Vijay

On Thu, Apr 8, 2021 at 12:52 AM Arvid Heise  wrote:

> Hi Vijay,
>
> edit: After re-reading your message: are you sure that you restart from a
> checkpoint/savepoint? If you just start the application anew and use LATEST
> initial position, this is the expected bahvior.
>
> --- original intended answer if you restart from checkpoint
>
> this is definitively not the expected behavior.
>
> To exclude certain error sources:
> - Could you double-check if this is also happening if you don't use
> unaligned checkpoints? (I don't really think this is because of unaligned
> checkpoint, but it's better to be sure and we want to reduce the possible
> error sources)
> - Can you see the missing messages still in Kinesis?
> - Could you extract all log INFO statements from
> org.apache.flink.streaming.connectors.kinesis and attach them here?
> - How long did you wait with recovery?
>
>
>
> On Wed, Apr 7, 2021 at 8:03 PM Vijayendra Yadav 
> wrote:
>
>> Hi Team,
>>
>> We are trying to make sure we are not losing data when KINESIS Consumer
>> is down.
>>
>> Kinesis streaming Job which has following checkpointing properties:
>>
>>
>> *// checkpoint every X msecs
>> env.enableCheckpointing(Conf.getFlinkCheckpointInterval());*
>>
>>
>>
>>
>>
>>
>>
>>
>> *// enable externalized checkpoints which are retained after job
>> cancellation
>> env.getCheckpointConfig().enableExternalizedCheckpoints(
>> CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
>>   );// allow job recovery fallback to checkpoint when there is a more
>> recent savepoint
>> env.getCheckpointConfig().setPreferCheckpointForRecovery(Conf.isFlinkCheckpointPreferCheckPoint());
>>  //
>> enables the experimental unaligned checkpoints
>> env.getCheckpointConfig().enableUnalignedCheckpoints();*
>>
>> *//checkpointpath*
>> *env.setStateBackend(new
>> FsStateBackend(Conf.getFlinkCheckPointPath(), true));*
>>
>> 1) We killed the Kinesis Job
>> 2) Sent messages to KDS while Consumer was down.
>> 3) Restarted Flink Consumer, *messages which were sent during the
>> Consumer down period, never ingested (data loss).*
>> 4) Re-sent messages to KDS while the consumer was still up. Messages did
>> ingest fine.
>>
>> *How can I avoid data loss for #3 ??*
>>
>> From Logs:
>>
>>
>> *2021-04-07 12:15:49,161 INFO
>>  org.apache.flink.runtime.jobmaster.JobMaster  - Using
>> application-defined state backend: File State Backend (checkpoints:
>> 's3://bucket-xx/flink/checkpoint/iv', savepoints: 'null', asynchronous:
>> TRUE, fileStateThreshold: -1)*
>>
>>
>>
>> *2021-04-07 12:16:02,343 INFO
>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
>> checkpoint 1 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 591
>> ms).2021-04-07 12:16:11,951 INFO
>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
>> checkpoint 2 (type=CHECKPOINT) @ 1617797771751 for job
>> 8943d16e22b8aaf65d6b9e2b8bd54113.2021-04-07 12:16:12,483 INFO
>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
>> checkpoint 2 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 411
>> ms).*
>>
>> Thanks,
>> Vijay
>>
>


Re: FLINK Kinesis consumer Checkpointing data loss

2021-04-08 Thread Arvid Heise
Hi Vijay,

edit: After re-reading your message: are you sure that you restart from a
checkpoint/savepoint? If you just start the application anew and use LATEST
initial position, this is the expected bahvior.

--- original intended answer if you restart from checkpoint

this is definitively not the expected behavior.

To exclude certain error sources:
- Could you double-check if this is also happening if you don't use
unaligned checkpoints? (I don't really think this is because of unaligned
checkpoint, but it's better to be sure and we want to reduce the possible
error sources)
- Can you see the missing messages still in Kinesis?
- Could you extract all log INFO statements from
org.apache.flink.streaming.connectors.kinesis and attach them here?
- How long did you wait with recovery?



On Wed, Apr 7, 2021 at 8:03 PM Vijayendra Yadav 
wrote:

> Hi Team,
>
> We are trying to make sure we are not losing data when KINESIS Consumer is
> down.
>
> Kinesis streaming Job which has following checkpointing properties:
>
>
> *// checkpoint every X msecs
> env.enableCheckpointing(Conf.getFlinkCheckpointInterval());*
>
>
>
>
>
>
>
>
> *// enable externalized checkpoints which are retained after job
> cancellation
> env.getCheckpointConfig().enableExternalizedCheckpoints(
> CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
>   );// allow job recovery fallback to checkpoint when there is a more
> recent savepoint
> env.getCheckpointConfig().setPreferCheckpointForRecovery(Conf.isFlinkCheckpointPreferCheckPoint());
>  //
> enables the experimental unaligned checkpoints
> env.getCheckpointConfig().enableUnalignedCheckpoints();*
>
> *//checkpointpath*
> *env.setStateBackend(new
> FsStateBackend(Conf.getFlinkCheckPointPath(), true));*
>
> 1) We killed the Kinesis Job
> 2) Sent messages to KDS while Consumer was down.
> 3) Restarted Flink Consumer, *messages which were sent during the
> Consumer down period, never ingested (data loss).*
> 4) Re-sent messages to KDS while the consumer was still up. Messages did
> ingest fine.
>
> *How can I avoid data loss for #3 ??*
>
> From Logs:
>
>
> *2021-04-07 12:15:49,161 INFO
>  org.apache.flink.runtime.jobmaster.JobMaster  - Using
> application-defined state backend: File State Backend (checkpoints:
> 's3://bucket-xx/flink/checkpoint/iv', savepoints: 'null', asynchronous:
> TRUE, fileStateThreshold: -1)*
>
>
>
> *2021-04-07 12:16:02,343 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 1 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 591
> ms).2021-04-07 12:16:11,951 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 2 (type=CHECKPOINT) @ 1617797771751 for job
> 8943d16e22b8aaf65d6b9e2b8bd54113.2021-04-07 12:16:12,483 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 2 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 411
> ms).*
>
> Thanks,
> Vijay
>


FLINK Kinesis consumer Checkpointing data loss

2021-04-07 Thread Vijayendra Yadav
Hi Team,

We are trying to make sure we are not losing data when KINESIS Consumer is
down.

Kinesis streaming Job which has following checkpointing properties:


*// checkpoint every X msecs
env.enableCheckpointing(Conf.getFlinkCheckpointInterval());*








*// enable externalized checkpoints which are retained after job
cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
  );// allow job recovery fallback to checkpoint when there is a more
recent savepoint
env.getCheckpointConfig().setPreferCheckpointForRecovery(Conf.isFlinkCheckpointPreferCheckPoint());
//
enables the experimental unaligned checkpoints
env.getCheckpointConfig().enableUnalignedCheckpoints();*

*//checkpointpath*
*env.setStateBackend(new
FsStateBackend(Conf.getFlinkCheckPointPath(), true));*

1) We killed the Kinesis Job
2) Sent messages to KDS while Consumer was down.
3) Restarted Flink Consumer, *messages which were sent during the Consumer
down period, never ingested (data loss).*
4) Re-sent messages to KDS while the consumer was still up. Messages did
ingest fine.

*How can I avoid data loss for #3 ??*

>From Logs:


*2021-04-07 12:15:49,161 INFO  org.apache.flink.runtime.jobmaster.JobMaster
 - Using application-defined state backend: File State
Backend (checkpoints: 's3://bucket-xx/flink/checkpoint/iv', savepoints:
'null', asynchronous: TRUE, fileStateThreshold: -1)*



*2021-04-07 12:16:02,343 INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
checkpoint 1 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 591
ms).2021-04-07 12:16:11,951 INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
checkpoint 2 (type=CHECKPOINT) @ 1617797771751 for job
8943d16e22b8aaf65d6b9e2b8bd54113.2021-04-07 12:16:12,483 INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
checkpoint 2 for job 8943d16e22b8aaf65d6b9e2b8bd54113 (3564 bytes in 411
ms).*

Thanks,
Vijay