Thanks Gordon, So, 10(ThreadPoolSize) * 80 sub-tasks = 800 threads goes to a Queue(unbounded by default). This then goes through KPL MaxConnections(24 by default) to KDS.
This suggests, I need to decrease sub-tasks or setQueueLimit(800) and increase MaxConnections=256 (max allowed). Checkpointing is not currently enabled. Pls correct me if I am wrong. On Tue, Jul 21, 2020 at 7:40 PM Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > Hi Vijay, > > ThreadPoolSize is for per Kinesis producer, which there is one for each > parallel subtask. > If you are constantly hitting the 1MB per second per shard quota, then the > records will be buffered by the FlinkKinesisProducer. > During this process, backpressure is not applied if you have not > configured an upper bound for the buffer queue. > > One other thing to note, which might explain the backpresses at regular > intervals that you are experiencing, > is that the FlinkKinesisProducer needs to flush all pending records in the > buffer before the checkpoint can complete for the sink. > That would also apply backpressure upstream. > > Gordon > > On Fri, Jul 10, 2020 at 7:02 AM Vijay Balakrishnan <bvija...@gmail.com> > wrote: > >> Hi Gordon, >> ThreadPoolSize default is 10. I have parallelism of 80 spread out across >> 32 nodes. >> Could it be that the 80 threads get bottlenecked on a common ThreadPool >> of 10 or is it spawning 80 * 10 threads in total. The Flink TaskManagers >> run in separate slots/vCPUs and can be spread across 32 nodes in my case >> but occupying 80 slots/vCPUs. Is my understanding correct and will this be >> the reason that the KPL gets flooded with too many pending requests at >> regular intervals ?? >> >> TIA, >> >> On Thu, Jul 9, 2020 at 12:15 PM Vijay Balakrishnan <bvija...@gmail.com> >> wrote: >> >>> Thanks,Gordon for your reply. >>> >>> I do not set a queueLimit and so the default unbounded queueSize is >>> 2147483647. >>> So, it should just be dropping records being produced from the >>> 80(parallelism) * 10 (ThreadPoolSize) = 800 threads based on Recordttl. I >>> do not want backpressure as you said it effectively blocks all upstream >>> operators. >>> >>> But from what you are saying, it will apply backpressure when the number >>> of outstanding records accumulated exceeds the default queue limit of >>> 2147483647 >>> or* does it also do it if it is r**ate-limited* *to 1MB per second per >>> shard by Kinesis* ? The 2nd case of Rate Limiting by Kinesis seems more >>> probable. >>> >>> So, calculating Queue Limit: >>> Based on this, my records size = 1600 bytes. I have 96 shards >>> Assuming - With the default RecordMaxBufferedTime of 100ms, a queue size >>> of 100kB per shard should be sufficient.So, Queue size/shard=100KB >>> Queue Limit with 96 shards = (96 * 10^5)/ 1600 = 6000 >>> Queue Limit with 4 shards = (4 * 10^5)/ 1600 = 0.25 >>> >>> Acc. to the docs: >>> >>> By default, FlinkKinesisProducer does not backpressure. Instead, >>> records that cannot be sent because of the rate restriction of 1 MB per >>> second per shard are buffered in an unbounded queue and dropped when their >>> RecordTtl expires. >>> >>> To avoid data loss, you can enable backpressuring by restricting the >>> size of the internal queue: >>> >>> // 200 Bytes per record, 1 shard >>> kinesis.setQueueLimit(500); >>> >>> >>> On Wed, Jul 8, 2020 at 12:20 AM Tzu-Li (Gordon) Tai <tzuli...@apache.org> >>> wrote: >>> >>>> Hi Vijay, >>>> >>>> The FlinkKinesisProducer does not use blocking calls to the AWS KDS API. >>>> It does however apply backpressure (therefore effectively blocking all >>>> upstream operators) when the number of outstanding records accumulated >>>> exceeds a set limit, configured using the >>>> FlinkKinesisProducer#setQueueLimit >>>> method. >>>> >>>> For starters, you can maybe check if that was set appropriately. >>>> >>>> Cheers, >>>> Gordon >>>> >>>> >>>> >>>> -- >>>> Sent from: >>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>>> >>>