Hi Zain,

I'm looping in Danny here, he is probably the most knowledgeable when it
comes to the Kinesis connector.

Best,
Alexander

On Mon, May 16, 2022 at 12:13 AM Zain Haider Nemati <zain.hai...@retailo.co>
wrote:

> Hi,
> Im fetching data from kafka topics converting them to chunks of <= 1MB and
> sinking them to a kinesis data stream.
> The streaming job is functional however I see bursts of data in kinesis
> stream with intermittent dips where data received is 0. I'm attaching the
> configuration parameters for kinesis sink. What could be the cause of this
> issue?
> The data is being fed into datastream by a kafka topic which is being fed
> in by a mongodb and has about 60 million records which are loaded fully.
> I am trying to configure parameters in such a way that the 1MB per data
> payload limit of kinesis is not breached. Would appreciate help on this !
>
> producerConfig.put(AWSConfigConstants.AWS_REGION, “xxx”);
>
> producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, “xxx");
>
> producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, “xxx”);
>                     producerConfig.put(“AggregationMaxCount”, “3”);
>                     producerConfig.put(“AggregationMaxSize”, “256”);
>                     producerConfig.put(“CollectionMaxCount”, “3”);
>                     producerConfig.put(“CollectionMaxSize”, “100000”);
>                     producerConfig.put(“AggregationEnabled”, true);
>                     producerConfig.put(“RateLimit”, “50");
>                     producerConfig.put(“RecordMaxBufferedTime”, “1000");
>                     producerConfig.put(“ThreadingModel”, “POOLED”);
>                     FlinkKinesisProducer<String> kinesis = new
> FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig);
>                         kinesis.setFailOnError(false);
>                         kinesis.setDefaultStream(“xxx”);
>                         kinesis.setDefaultPartition(“0");
>                         kinesis.setQueueLimit(1000);
>
> *Data in Kinesis :*
> [image: image.png]
>


-- 

Alexander Preuß | Engineer - Data Intensive Systems

alexanderpre...@ververica.com

<https://www.ververica.com/>


Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH

Registered at Amtsgericht Charlottenburg: HRB 158244 B

Managing Directors: Alexander Walden, Karl Anton Wehner, Yip Park Tung
Jason, Jinwei (Kevin) Zhang

Reply via email to