Hi Zain, I'm looping in Danny here, he is probably the most knowledgeable when it comes to the Kinesis connector.
Best, Alexander On Mon, May 16, 2022 at 12:13 AM Zain Haider Nemati <zain.hai...@retailo.co> wrote: > Hi, > Im fetching data from kafka topics converting them to chunks of <= 1MB and > sinking them to a kinesis data stream. > The streaming job is functional however I see bursts of data in kinesis > stream with intermittent dips where data received is 0. I'm attaching the > configuration parameters for kinesis sink. What could be the cause of this > issue? > The data is being fed into datastream by a kafka topic which is being fed > in by a mongodb and has about 60 million records which are loaded fully. > I am trying to configure parameters in such a way that the 1MB per data > payload limit of kinesis is not breached. Would appreciate help on this ! > > producerConfig.put(AWSConfigConstants.AWS_REGION, “xxx”); > > producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, “xxx"); > > producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, “xxx”); > producerConfig.put(“AggregationMaxCount”, “3”); > producerConfig.put(“AggregationMaxSize”, “256”); > producerConfig.put(“CollectionMaxCount”, “3”); > producerConfig.put(“CollectionMaxSize”, “100000”); > producerConfig.put(“AggregationEnabled”, true); > producerConfig.put(“RateLimit”, “50"); > producerConfig.put(“RecordMaxBufferedTime”, “1000"); > producerConfig.put(“ThreadingModel”, “POOLED”); > FlinkKinesisProducer<String> kinesis = new > FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig); > kinesis.setFailOnError(false); > kinesis.setDefaultStream(“xxx”); > kinesis.setDefaultPartition(“0"); > kinesis.setQueueLimit(1000); > > *Data in Kinesis :* > [image: image.png] > -- Alexander Preuß | Engineer - Data Intensive Systems alexanderpre...@ververica.com <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Alexander Walden, Karl Anton Wehner, Yip Park Tung Jason, Jinwei (Kevin) Zhang