ThreadPoolSize is per KPL instance, so yes that is per subtask.
As I previously mentioned, the maximum concurrent requests going to KDS
would be capped by MaxConnections.

On Thu, Jul 23, 2020 at 6:25 AM Vijay Balakrishnan <bvija...@gmail.com>
wrote:

> Hi Gordon,
> Thx for your reply.
> FlinkKinesisProducer default is ThreadPool which is what I am using. So,
> does that mean only 10 threads are making calls to KDS by default ??
> I see from the number of records coming to the KDS that I need only 1-2
> shards. So, the bottleneck is on the KPL side.
> Does this mean I have to set a QueueLimit of 500 as shown in the example
> below ??
> From what you said, Total MaxConnections would then be by default: 24 *
> number of subtasks = 24 * 80 = 1920 connections to KDS.
> KPL ThreadPoolSize would be 10 Threads by default - is this per subtask ?
> So, would it be 10 * number of subtasks = 10 * 80 = 800 Threads ??
>
> I am trying to reconcile the diff above ? Somewhere I am flooding KPL with
> too many requests & it gives the curl 28 error.
>
> So, calculating Queue Limit:
> Based on this, my records size = 1600 bytes. I have 96 shards
> Assuming - With the default RecordMaxBufferedTime of 100ms, a queue size
> of 100kB per shard should be sufficient.So, Queue size/shard=100KB
> Queue Limit with 96 shards = (96 * 10^5)/ 1600 = 6000
> Queue Limit with 4 shards = (4 * 10^5)/ 1600 = 0.25
>
> Acc. to the docs:
>
> By default, FlinkKinesisProducer does not backpressure. Instead, records
> that cannot be sent because of the rate restriction of 1 MB per second per
> shard are buffered in an unbounded queue and dropped when their RecordTtl
>  expires.
>
> To avoid data loss, you can enable backpressuring by restricting the size
> of the internal queue:
>
> // 200 Bytes per record, 1 shard
> kinesis.setQueueLimit(500);
>
>
> On Tue, Jul 21, 2020 at 8:00 PM Tzu-Li (Gordon) Tai <tzuli...@apache.org>
> wrote:
>
>> Hi Vijay,
>>
>> I'm not entirely sure of the semantics between ThreadPoolSize and
>> MaxConnections since they are all KPL configurations (this specific
>> question would probably be better directed to AWS),
>> but my guess would be that the number of concurrent requests to the KPL
>> backend is capped by MaxConnections. This is per parallel
>> FlinkKinesisProducer subtask.
>>
>> As for ThreadPoolSize, do note that the default threading model by KPL is
>> PER_REQUEST, for which the KPL native process will launch a thread for each
>> request.
>> Under heavy load, this would of course be an issue. Since you didn't
>> explicitly mention this config, make sure to set this to POOLED to actually
>> make use of a fixed thread pool for requests.
>>
>> Overall, my suggestion is to set a reasonable queue limit for the number
>> of records buffered by KPL's native process (by default it is unbounded).
>> Without that in place, under high load you would easily be resource
>> exhausted, and can cause more unpredictable checkpointing times since the
>> FlinkKinesisProducer would need to flush pending records on checkpoints
>> (which ultimately also applies backpressure upstream).
>>
>> BR,
>> Gordon
>>
>> On Wed, Jul 22, 2020 at 5:21 AM Vijay Balakrishnan <bvija...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> Trying to tune the KPL and FlinkKinesisProducer for Kinesis Data
>>> stream(KDS).
>>> Getting following errors:
>>> 1.
>>> Throttling
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>>
>>>  
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>> at
>>> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.KinesisProducer.addUserRecord(KinesisProducer.java:536)
>>>
>>> 2. ERROR
>>> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader
>>>  - [2020-06-18 15:49:24.238655] [0x00000ed6][0x00007fc2086c8700] [error]
>>> [shard_map.cc:150] Shard map update for stream "...._write" failed. Code: 
>>> *LimitExceededException
>>> Message: Rate exceeded for stream *..._write under account
>>> 753274046439.; retrying in 1500 ms
>>>
>>> 3. [AWS Log: ERROR](CurlHttpClient)*Curl returned error code 28*
>>>
>>>
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#backpressure
>>>
>>>
>>> https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties
>>>
>>>
>>> https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-flink-timeout/
>>>
>>> These are the KPL property changes I am planning to make.
>>>
>>> *RequestTimeput*: 10000 //default 6000 ms
>>>
>>> *AggregationEnabled*: true //default is true
>>>
>>> *ThreadPoolSize*: *15* //default 10
>>>
>>> *MaxConnections*: *48* //default 24 - this might have been a bottleneck
>>> when we flooded KPL with requests. Requests are sent in parallel over
>>> multiple connections to the backend.
>>>
>>> *RecordTtl*: *10000* //default 30000 ms  - drop record after 10s.
>>>
>>> *FailIfThrottled*: *true* //default false - so if throttled, don't
>>> retry.
>>>
>>>
>>> We were using parallelism for sinks at 80. So each corresponds to 1
>>> FlinkKinesisProducer. So, 80 * 10(ThreadPoolSize) = 800 threads.
>>> MaxConnections is 24 from KPL.
>>>
>>> I am not sure about the MaxConnections setting - what does 48 mean here
>>> -is it 40(sink parallelism) * 15(ThreadPoolSize) * 48 calls to the KDS
>>> backend via KPL ?
>>>
>>> Any thoughts on how not to overwhelm KPL while handling real time
>>> streaming load to the Kinesis via the FlinkKinesisProducer ?
>>>
>>> TIA,
>>>
>>

Reply via email to