Hello Zain, When you say "converting them to chunks of <= 1MB " does this mean you are creating these chunks in a custom Flink operator, or you are relying on the connector to do so? If you are generating your own chunks you can potentially disable Aggregation at the sink.
Your throughput is incredibly bursty, I have a few questions: - Which Flink version is this? - Can you see any errors in the Flink logs? - Do you see any errors/throttles in the Kinesis Data Stream metrics? - How many shards does your stream have? - What is your sink operator parallelism? - What is the general health of your job graph? - Are the operators upstream of the sink backpressured? - Are you sure the sink is actually the issue here? - Are there any other potential bottlenecks? - When you say you are trying to achieve "1MB chunks", I assume this is per Kinesis record, not per PutRecords batch? Some comments on your configuration: As previously mentioned, if you are generating the chunks you can potentially remove the aggregation config and disable it. - producerConfig.put(“AggregationMaxCount”, “3”); - producerConfig.put(“AggregationMaxSize”, “256”); + producerConfig.put("AggregationEnabled”, “false”); This is very low, and could conflict with your chunk size. These configurations are regarding the PutRecords request, which has a quota of 500 records and 5MiB. You are setting the max size to 100kB, which is less than your largest chunk. I would recommend removing these configurations. - producerConfig.put(“CollectionMaxCount”, “3”); - producerConfig.put(“CollectionMaxSize”, “100000”); This is the default threading model, so can be removed. - producerConfig.put(“ThreadingModel”, “POOLED”); This config should not have too much impact. The default is 100ms, you are increasing to 1s. This could increase your end-to-end latency under low throughput scenarios. - producerConfig.put(“RecordMaxBufferedTime”, “1000"); This config controls the sink backpressure and can also impact throughput. Do you see any logs like "Waiting for the queue length to drop below the limit takes unusually long, still not done after <x> attempts"? kinesis.setQueueLimit(1000); Thanks, Danny On Mon, May 16, 2022 at 5:27 PM Alexander Preuß < alexanderpre...@ververica.com> wrote: > Hi Zain, > > I'm looping in Danny here, he is probably the most knowledgeable when it > comes to the Kinesis connector. > > Best, > Alexander > > On Mon, May 16, 2022 at 12:13 AM Zain Haider Nemati < > zain.hai...@retailo.co> wrote: > >> Hi, >> Im fetching data from kafka topics converting them to chunks of <= 1MB >> and sinking them to a kinesis data stream. >> The streaming job is functional however I see bursts of data in kinesis >> stream with intermittent dips where data received is 0. I'm attaching the >> configuration parameters for kinesis sink. What could be the cause of this >> issue? >> The data is being fed into datastream by a kafka topic which is being fed >> in by a mongodb and has about 60 million records which are loaded fully. >> I am trying to configure parameters in such a way that the 1MB per data >> payload limit of kinesis is not breached. Would appreciate help on this ! >> >> producerConfig.put(AWSConfigConstants.AWS_REGION, “xxx”); >> >> producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, “xxx"); >> >> producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, “xxx”); >> producerConfig.put(“AggregationMaxCount”, “3”); >> producerConfig.put(“AggregationMaxSize”, “256”); >> producerConfig.put(“CollectionMaxCount”, “3”); >> producerConfig.put(“CollectionMaxSize”, “100000”); >> producerConfig.put(“AggregationEnabled”, true); >> producerConfig.put(“RateLimit”, “50"); >> producerConfig.put(“RecordMaxBufferedTime”, “1000"); >> producerConfig.put(“ThreadingModel”, “POOLED”); >> FlinkKinesisProducer<String> kinesis = new >> FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig); >> kinesis.setFailOnError(false); >> kinesis.setDefaultStream(“xxx”); >> kinesis.setDefaultPartition(“0"); >> kinesis.setQueueLimit(1000); >> >> *Data in Kinesis :* >> [image: image.png] >> > > > -- > > Alexander Preuß | Engineer - Data Intensive Systems > > alexanderpre...@ververica.com > > <https://www.ververica.com/> > > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > > Ververica GmbH > > Registered at Amtsgericht Charlottenburg: HRB 158244 B > > Managing Directors: Alexander Walden, Karl Anton Wehner, Yip Park Tung > Jason, Jinwei (Kevin) Zhang > >