Hi Vijay,

ThreadPoolSize is for per Kinesis producer, which there is one for each
parallel subtask.
If you are constantly hitting the 1MB per second per shard quota, then the
records will be buffered by the FlinkKinesisProducer.
During this process, backpressure is not applied if you have not configured
an upper bound for the buffer queue.

One other thing to note, which might explain the backpresses at regular
intervals that you are experiencing,
is that the FlinkKinesisProducer needs to flush all pending records in the
buffer before the checkpoint can complete for the sink.
That would also apply backpressure upstream.

Gordon

On Fri, Jul 10, 2020 at 7:02 AM Vijay Balakrishnan <bvija...@gmail.com>
wrote:

> Hi Gordon,
> ThreadPoolSize default is 10. I have parallelism of 80 spread out across
> 32 nodes.
> Could it be that the 80 threads get bottlenecked on a common ThreadPool of
> 10 or is it spawning 80 * 10 threads in total. The Flink TaskManagers run
> in separate slots/vCPUs and can be spread across 32 nodes in my case but
> occupying 80 slots/vCPUs. Is my understanding correct and will this be the
> reason that the KPL gets flooded with too many pending requests at regular
> intervals ??
>
> TIA,
>
> On Thu, Jul 9, 2020 at 12:15 PM Vijay Balakrishnan <bvija...@gmail.com>
> wrote:
>
>> Thanks,Gordon for your reply.
>>
>> I do not set a queueLimit and so the default unbounded queueSize is 
>> 2147483647.
>> So, it should just be dropping records being produced from the
>> 80(parallelism) * 10 (ThreadPoolSize) = 800 threads based on Recordttl. I
>> do not want backpressure as you said it effectively blocks all upstream
>> operators.
>>
>> But from what you are saying, it will apply backpressure when the number
>> of outstanding records accumulated exceeds the default queue limit of 
>> 2147483647
>> or* does it also do it if it is r**ate-limited* *to 1MB per second per
>> shard by Kinesis* ? The 2nd case of Rate Limiting by Kinesis seems more
>> probable.
>>
>> So, calculating Queue Limit:
>> Based on this, my records size = 1600 bytes. I have 96 shards
>> Assuming - With the default RecordMaxBufferedTime of 100ms, a queue size
>> of 100kB per shard should be sufficient.So, Queue size/shard=100KB
>> Queue Limit with 96 shards = (96 * 10^5)/ 1600 = 6000
>> Queue Limit with 4 shards = (4 * 10^5)/ 1600 = 0.25
>>
>> Acc. to the docs:
>>
>> By default, FlinkKinesisProducer does not backpressure. Instead, records
>> that cannot be sent because of the rate restriction of 1 MB per second per
>> shard are buffered in an unbounded queue and dropped when their RecordTtl
>> expires.
>>
>> To avoid data loss, you can enable backpressuring by restricting the size
>> of the internal queue:
>>
>> // 200 Bytes per record, 1 shard
>> kinesis.setQueueLimit(500);
>>
>>
>> On Wed, Jul 8, 2020 at 12:20 AM Tzu-Li (Gordon) Tai <tzuli...@apache.org>
>> wrote:
>>
>>> Hi Vijay,
>>>
>>> The FlinkKinesisProducer does not use blocking calls to the AWS KDS API.
>>> It does however apply backpressure (therefore effectively blocking all
>>> upstream operators) when the number of outstanding records accumulated
>>> exceeds a set limit, configured using the
>>> FlinkKinesisProducer#setQueueLimit
>>> method.
>>>
>>> For starters, you can maybe check if that was set appropriately.
>>>
>>> Cheers,
>>> Gordon
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>

Reply via email to