Hi,

I am also facing the same problem. I am using Flink 1.9.0 and consuming
from Kinesis source with retention of 1 day. I am observing that when the
job is submitted with "latest" initial stream position, the job starts well
and keep on processing data from all the shards for very long period of
time without any lag. When the job fails then it also recovery well with
last successful checkpointed state. But i am also experiencing that very
rarely when the job fails and it recovers from the last successful
checkpointed state, i noticed a hug lag( 1 day as per retention) on one of
the stream. For me, to reproduce this issue is still unknown to defined a
step by step process.

So far now, as per the analysis i gathered some  more information by
customizing the FlinkKinesisConsumer to put additional log message, I
noticed that the number of shards details which is loaded from checkpoint
data during recovering is less than than the actual number of shards in the
stream. I have fixed number of shards in kinesis stream.

i added one line of debug log at line 408 to print the size of variable "
sequenceNumsToRestore" which was populated with shard details from
checkpoint data.
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L408

In this consumer class, when the "run" method is called, it does following

   -  it discover shards from kinesis stream and selects all those shards
   which a subtask can scheduled
   - then one by one it iterates over the discovers shards and checks that
   whether that shards state is available in recovered state
   "sequenceNumsToRestore"
   
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L295
   - if it is available then it scheduled that shard with the recovered
   state
   - if it is not available in the state then it shcedule that shard with
   "EARLIEST_SEQUENCE_NUMBER"
   
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L308

As in my case, the recovered number of shard details from the checkpoint
data is less than the actual number of shards which results into scheduling
those shards with earliest stream position.
I am suspecting that somehow the checkpoint is missing state for some of
the shards. But if this is the case then that checkpoint should had failed.

Any further information to resolve this issue would be highly appreciated...

Regards,
Ravi

On Wed, Oct 16, 2019 at 5:57 AM Yun Tang <myas...@live.com> wrote:

> Hi Steven
>
> If you restore savepoint/checkpoint successfully, I think this might due
> to the shard wasn't discovered in the previous run, therefore it would be
> consumed from the beginning. Please refer to the implementation here: [1]
>
> [1]
> https://github.com/apache/flink/blob/2c411686d23f456cdc502abf1c6b97a61070a17d/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java#L307
>
> Best
> Yun Tang
> ------------------------------
> *From:* Steven Nelson <snel...@sourceallies.com>
> *Sent:* Wednesday, October 16, 2019 4:31
> *To:* user <user@flink.apache.org>
> *Subject:* Kinesis Connector and Savepoint/Checkpoint restore.
>
> Hello, we currently use Flink 1.9.0 with Kinesis to process data.
>
> We have extended data retention on the Kinesis stream, which gives us 7
> days of data.
>
> We have found that when a savepoint/checkpoint is restored that it appears
> to be restarting the Kinesis Consumer from the start of the stream.
> The 
> flink_taskmanager_job_task_operator_KinesisConsumer_stream_shardId_millisBehindLatest
> property reports to Prometheus that it is behind by 7 days when the process
> starts back up from a savepoint.
>
> We have some logs that say:
>
> Subtask 3 will start consuming seeded shard 
> StreamShardHandle{streamName='TheStream',
> shard='{ShardId: shardId-000000000083,HashKeyRange: {StartingHashKey:
> 220651847300296034902031972006537199616,EndingHashKey:
> 223310303291865866647839586127097888767},SequenceNumberRange:
> {StartingSequenceNumber:
> 49597946220601502339755334362523522663986150244033234226,}}'} from sequence
> number EARLIEST_SEQUENCE_NUM with ShardConsumer 20
>
> This seems to indicate that this shard is starting from the beginning of
> the stream
>
> and some logs that say:
> Subtask 3 will start consuming seeded shard StreamShardHandle{streamName='
> TheStream ', shard='{ShardId: shardId-000000000087,HashKeyRange:
> {StartingHashKey: 231285671266575361885262428488779956224,EndingHashKey:
> 233944127258145193631070042609340645375},SequenceNumberRange:
> {StartingSequenceNumber:
> 49597946220690705320549456855089665537076743690057155954,}}'} from sequence
> number 49599841594208637293623823226010128300928335129272649074 with
> ShardConsumer 21
>
> This shard seems to be resuming from a specific point.
>
> I am assuming that this might be caused by no data being available on the
> shard for the entire stream (possible with this application stage). Is this
> the expected behavior? I had thought it would checkpoint with the most
> recent sequence number, regardless of if it got data or not.
>
> -Steve
>
>
>

Reply via email to