Thanks Gordon,
So, 10(ThreadPoolSize) * 80 sub-tasks = 800 threads goes to a
Queue(unbounded by default). This then goes through KPL MaxConnections(24
by default) to KDS.

This suggests,  I need to decrease sub-tasks or setQueueLimit(800) and
increase MaxConnections=256 (max allowed).
Checkpointing is not currently enabled.

Pls correct me if I am wrong.

On Tue, Jul 21, 2020 at 7:40 PM Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Hi Vijay,
>
> ThreadPoolSize is for per Kinesis producer, which there is one for each
> parallel subtask.
> If you are constantly hitting the 1MB per second per shard quota, then the
> records will be buffered by the FlinkKinesisProducer.
> During this process, backpressure is not applied if you have not
> configured an upper bound for the buffer queue.
>
> One other thing to note, which might explain the backpresses at regular
> intervals that you are experiencing,
> is that the FlinkKinesisProducer needs to flush all pending records in the
> buffer before the checkpoint can complete for the sink.
> That would also apply backpressure upstream.
>
> Gordon
>
> On Fri, Jul 10, 2020 at 7:02 AM Vijay Balakrishnan <bvija...@gmail.com>
> wrote:
>
>> Hi Gordon,
>> ThreadPoolSize default is 10. I have parallelism of 80 spread out across
>> 32 nodes.
>> Could it be that the 80 threads get bottlenecked on a common ThreadPool
>> of 10 or is it spawning 80 * 10 threads in total. The Flink TaskManagers
>> run in separate slots/vCPUs and can be spread across 32 nodes in my case
>> but occupying 80 slots/vCPUs. Is my understanding correct and will this be
>> the reason that the KPL gets flooded with too many pending requests at
>> regular intervals ??
>>
>> TIA,
>>
>> On Thu, Jul 9, 2020 at 12:15 PM Vijay Balakrishnan <bvija...@gmail.com>
>> wrote:
>>
>>> Thanks,Gordon for your reply.
>>>
>>> I do not set a queueLimit and so the default unbounded queueSize is 
>>> 2147483647.
>>> So, it should just be dropping records being produced from the
>>> 80(parallelism) * 10 (ThreadPoolSize) = 800 threads based on Recordttl. I
>>> do not want backpressure as you said it effectively blocks all upstream
>>> operators.
>>>
>>> But from what you are saying, it will apply backpressure when the number
>>> of outstanding records accumulated exceeds the default queue limit of 
>>> 2147483647
>>> or* does it also do it if it is r**ate-limited* *to 1MB per second per
>>> shard by Kinesis* ? The 2nd case of Rate Limiting by Kinesis seems more
>>> probable.
>>>
>>> So, calculating Queue Limit:
>>> Based on this, my records size = 1600 bytes. I have 96 shards
>>> Assuming - With the default RecordMaxBufferedTime of 100ms, a queue size
>>> of 100kB per shard should be sufficient.So, Queue size/shard=100KB
>>> Queue Limit with 96 shards = (96 * 10^5)/ 1600 = 6000
>>> Queue Limit with 4 shards = (4 * 10^5)/ 1600 = 0.25
>>>
>>> Acc. to the docs:
>>>
>>> By default, FlinkKinesisProducer does not backpressure. Instead,
>>> records that cannot be sent because of the rate restriction of 1 MB per
>>> second per shard are buffered in an unbounded queue and dropped when their
>>> RecordTtl expires.
>>>
>>> To avoid data loss, you can enable backpressuring by restricting the
>>> size of the internal queue:
>>>
>>> // 200 Bytes per record, 1 shard
>>> kinesis.setQueueLimit(500);
>>>
>>>
>>> On Wed, Jul 8, 2020 at 12:20 AM Tzu-Li (Gordon) Tai <tzuli...@apache.org>
>>> wrote:
>>>
>>>> Hi Vijay,
>>>>
>>>> The FlinkKinesisProducer does not use blocking calls to the AWS KDS API.
>>>> It does however apply backpressure (therefore effectively blocking all
>>>> upstream operators) when the number of outstanding records accumulated
>>>> exceeds a set limit, configured using the
>>>> FlinkKinesisProducer#setQueueLimit
>>>> method.
>>>>
>>>> For starters, you can maybe check if that was set appropriately.
>>>>
>>>> Cheers,
>>>> Gordon
>>>>
>>>>
>>>>
>>>> --
>>>> Sent from:
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>
>>>

Reply via email to