Hi, 

usually this exception is thrown by aws-java-sdk and means that your kinesis 
stream is hitting a throughput limit (what a surprise). We experienced the same 
thing when we had a single "event-bus" style stream and multiple flink apps 
reading from it.

Each Kinesis partition has a limit of 5 poll operations per second. If you have 
a stream with 4 partitions and 30 jobs reading from it, I guess that each job 
is constantly hitting op limit for kinesis with default kinesis consumer 
settings and it does an exponential back-off (by just sleeping for a small 
period of time and then retrying).

You have two options here:
1. scale up the kinesis stream, so there will be more partitions and higher 
overall throughput limits
2. tune kinesis consumer backoff parameters:

Our current ones, for example, look like this:

 conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "2000") // 
we poll every 2s
 conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_BASE, "2000") // in 
case of throughput error, initial timeout is 2s
 conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_MAX, "10000") // we 
can go up to 10s pause
 
conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT, 
"1.5") // multiplying pause to 1.5 on each next step
 conf.put(ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES, "100") // and make 
up to 100 retries

with best regards,
Roman Grebennikov | g...@dfdx.me


On Mon, Jun 15, 2020, at 13:45, M Singh wrote:
> Hi:
> 
> I am using multiple (almost 30 and growing) Flink streaming applications that 
> read from the same kinesis stream and get 
> ProvisionedThroughputExceededException exception which fails the job.
> I have seen a reference 
> http://mail-archives.apache.org/mod_mbox/flink-user/201811.mbox/%3CCAJnSTVxpuOhCNTFTvEYd7Om4s=q2vz5-8+m4nvuutmj2oxu...@mail.gmail.com%3E
>  - which indicates there might be some solution perhaps in Flink 1.8/1.9. 
> 
> I also see [FLINK-10536] Flink Kinesis Consumer leading to job failure due to 
> ProvisionedThroughputExceededException - ASF JIRA 
> <https://issues.apache.org/jira/browse/FLINK-10536> is still open.
> 
> 
> So i wanted to find out 
> 
> 1. If this issue has been resolved and if so in which version ?
> 2. Is there any kinesis consumer with kinesis fanout available that can help 
> address this issue ?
> 3. Is there any specific parameter in kinesis consumer config that can 
> address this issue ?
> 
> If there is any other pointer/documentation/reference, please let me know.
> 
> Thanks
> 

Reply via email to