Hi Vijay,

Your assumption is correct that the discovery interval does not affect the
interval of fetching records.

As a side note, you can actually disable shard discovery, by setting the
value to -1.
The FlinkKinesisProducer would then only call ListShards once at job
startup.

Cheers,
Gordon

On Fri, Jul 10, 2020 at 2:35 AM Vijay Balakrishnan <bvija...@gmail.com>
wrote:

> Hi,
> I see these 2 constants- SHARD_GETRECORDS_INTERVAL_MILLIS &
> SHARD_DISCOVERY_INTERVAL_MILLIS.
>
> My understanding was SHARD_GETRECORDS_INTERVAL_MILLIS defines how often
> records are fetched from Kinesis Data Stream(KDS). Code seems to be doing
> this in ShardConsumer.run()-->getRecords()
>
> SHARD_DISCOVERY_INTERVAL_MILLIS defines how often the KinesisConsmer
> checks if there are any changes to shards. We don't change shards during
> our Application run.I have changed it to a very high value to avoid this
> check as I was running into ListShards issues with LimitExceedeException
> when using 282 shards
> Would this be a correct understanding of these 2 constants -especially the
> SHARD_DISCOVERY_INTERVAL_MILLIS
>
> My assumption that needs to be validated:
> The SHARD_DISCOVERY_INTERVAL_MILLIS should not affect the fetching of
> records as defined by SHARD_GETRECORDS_INTERVAL_MILLIS.
>
> Code below:
> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
> getRecsIntervalMs);//2000
>
> /*
> We do not change shards while the app is running.
> So, we can increase SHARD_DISCOVERY_INTERVAL_MILLIS to a very high value
> to avoid any rateLimiting issues from the AWS API with the ListShards call.
> Default is 10s. We can increase this to avoid this LimitExceededException
> as we don't change shards in the middle.
>  */
>
> kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS,
> shardDiscoveryInterval);//18000000 ms
>
>
> TIA,
>

Reply via email to