Hi Zain,

Glad you found the problem, good luck!

Thanks,
Danny Cranmer

On Fri, May 20, 2022 at 10:05 PM Zain Haider Nemati <zain.hai...@retailo.co>
wrote:

> Hi Danny,
> I looked into it in a bit more thorough detail, the bottleneck seems to be
> the transform function which is at 100% and causing back pressuring. Im
> looking into that.
> Thanks for your help, much appreciated !
>
> On Fri, May 20, 2022 at 1:24 AM Ber, Jeremy <jd...@amazon.com> wrote:
>
>> Hi Zain—
>>
>>
>>
>> Are you seeing any data loss present within the Flink Dashboard subtasks
>> of each task? On the bottom of your dashboard you should see data going
>> from each blue box to the next. Is this a comprehensive set of data?
>> Meaning do you see 80M from the source -> first operator -> second operator
>> -> sink?
>>
>>
>>
>> Secondly, it may be easier to troubleshoot this by removing a few
>> variables. Would you be able to remove the operator which segregates your
>> data into 100 length records and simply forward that data to the next
>> operator? Simultaneously, could you leave the Kinesis Producer
>> configuration settings (apart from queue limit) at their defaults? This
>> will give a good baseline from which to improve upon.
>>
>>
>>
>> Jeremy
>>
>>
>>
>> *From: *Zain Haider Nemati <zain.hai...@retailo.co>
>> *Date: *Wednesday, May 18, 2022 at 6:15 AM
>> *To: *Danny Cranmer <dannycran...@apache.org>
>> *Cc: *Alexander Preuß <alexanderpre...@ververica.com>, user <
>> user@flink.apache.org>
>> *Subject: *RE: [EXTERNAL]Kinesis Sink - Data being received with
>> intermittent breaks
>>
>>
>>
>> *CAUTION*: This email originated from outside of the organization. Do
>> not click links or open attachments unless you can confirm the sender and
>> know the content is safe.
>>
>>
>>
>> Hey Danny,
>>
>> Thanks for getting back to me.
>>
>> - You are seeing bursty throughput, but the job is keeping up? There is
>> no backpressure? --> Correct I'm not seeing any backpressure in any of the
>> metrics
>>
>> - What is the throughput at the sink? --> num of records out -- 1100 per
>> 10 seconds
>>
>> - On the graph screenshot, what is the period and stat (sum/average/etc)?
>> -->It is incoming data (MB/s) each second
>>
>>
>>
>> So let me explain this in totality, the number of records residing in the
>> source are about 80 million and the number of records i see in the kinesis
>> data stream after it has consumed the data from source is about 20 million
>> so im seeing alot of data loss and I think this potentially has to do with
>> the intermediate dips im seeing in the records coming in the data stream.
>>
>>
>>
>> What are the configurations you guys generally suggest for data of this
>> range and sinking to a kinesis data stream?
>>
>>
>>
>> On Wed, May 18, 2022 at 2:00 AM Danny Cranmer <dannycran...@apache.org>
>> wrote:
>>
>> Hello Zain,
>>
>>
>>
>> Thanks for providing the additional information. Going back to the
>> original issue:
>>
>> - You are seeing bursty throughput, but the job is keeping up? There is
>> no backpressure?
>>
>> - What is the throughput at the sink?
>>
>> - On the graph screenshot, what is the period and stat (sum/average/etc)?
>>
>>
>>
>> Let me shed some light on the log messages, let's take this example:
>>
>>
>>
>> LogInputStreamReader ... Stage 1 Triggers ...  { stream:
>> 'flink-kafka-tracer', manual: 0, count: 0, size: 0, matches: 0, timed: 3,
>> UserRecords: 6, KinesisRecords: 3 }
>>
>>
>>
>> Flush trigger reason:
>>
>> - manual: the flush was manually triggered
>>
>> - count: flush was triggered by the number of records in the container
>>
>> - size: the flush was triggered by the number of bytes in the container
>>
>> - matches: the predicate was matched
>>
>> - timed: the flush is triggered by elapsed timer
>>
>>
>>
>> Input/Output:
>>
>> - UserRecords: Number of input records KPL flushed (this can be higher
>> than KinesisRecords when aggregation is enabled)
>>
>> - KinesisRecords: Number of records shipped to Kinesis Data Streams
>>
>>
>>
>> Stage 2 triggers tells us the number of API invocations via
>> the PutRecords field.
>>
>>
>>
>> I can see from your logs that the majority of flushes are due to the
>> timer, and it does not look overly bursty. Seems to sit at around 3 records
>> per 15 seconds, or 1 record every 5 seconds. This seems very low, is it
>> expected?
>>
>>
>>
>> Thanks,
>>
>> Danny Cranmer
>>
>>
>>
>> On Mon, May 16, 2022 at 10:57 PM Zain Haider Nemati <
>> zain.hai...@retailo.co> wrote:
>>
>> Hey Danny,
>>
>> Thanks for having a look at the issue.
>>
>> I am using a custom flink operator to segregate the data into a
>> consistent format of length 100 which is no more than 1 MB. The
>> configurations I shared were after I was exploring tweaking some of them to
>> see if it improves the throughput.
>>
>>
>>
>> Regarding your queries :
>>
>> - Which Flink version is this? -- > *Version 1.13*
>>
>> - Can you see any errors in the Flink logs?  -->* No, Im attaching flink
>> logs after I have set all the configurations to default*
>>
>> - Do you see any errors/throttles in the Kinesis Data Stream metrics?
>> --> *I was before segregating into smaller chunks not anymore*
>>
>> - How many shards does your stream have? --> *It has 4 shards*
>>
>> - What is your sink operator parallelism? --> *1*
>>
>> - What is the general health of your job graph? --> *This is the only
>> job running at the moment, it isn't unhealthy*
>>
>>   - Are the operators upstream of the sink backpressured? --> *No*
>>
>>   - Are you sure the sink is actually the issue here? --> * I have used
>> the .print() as a sink and Im seeing all the records in real time it chokes
>> when paired with sink*
>>
>>   - Are there any other potential bottlenecks? --> *So data is coming in
>> from source correctly, I have a flatmap transformation enabled which reads
>> and segments it into chunks of <=1MB which is also tested using the
>> .print() sink*
>>
>> - When you say you are trying to achieve "1MB chunks", I assume this is
>> per Kinesis record, not per PutRecords batch? --> *Correct*
>>
>>
>>
>> Attaching a small chunk of the log file from when the job is started [It
>> goes down to 0 records for some periods of time as well, in the log file it
>> shows mostly between 3-6 records]
>>
>>
>>
>> Really appreciate your response on this, since I have not been able to
>> gather much help from other resources online. Would be great if you can let
>> me know what the issue here could be, let me know if you need to know
>> anything else as well !
>>
>>
>>
>> Cheers
>>
>>
>>
>>
>>
>> On Tue, May 17, 2022 at 12:34 AM Danny Cranmer <dannycran...@apache.org>
>> wrote:
>>
>> Hello Zain,
>>
>>
>>
>> When you say "converting them to chunks of <= 1MB " does this mean you
>> are creating these chunks in a custom Flink operator, or you are relying on
>> the connector to do so? If you are generating your own chunks you can
>> potentially disable Aggregation at the sink.
>>
>>
>>
>> Your throughput is incredibly bursty, I have a few questions:
>>
>> - Which Flink version is this?
>>
>> - Can you see any errors in the Flink logs?
>>
>> - Do you see any errors/throttles in the Kinesis Data Stream metrics?
>>
>> - How many shards does your stream have?
>>
>> - What is your sink operator parallelism?
>>
>> - What is the general health of your job graph?
>>
>>   - Are the operators upstream of the sink backpressured?
>>
>>   - Are you sure the sink is actually the issue here?
>>
>>   - Are there any other potential bottlenecks?
>>
>> - When you say you are trying to achieve "1MB chunks", I assume this is
>> per Kinesis record, not per PutRecords batch?
>>
>>
>>
>> Some comments on your configuration:
>>
>>
>>
>> As previously mentioned, if you are generating the chunks you can
>> potentially remove the aggregation config and disable it.
>>
>> - producerConfig.put(“AggregationMaxCount”, “3”);
>> - producerConfig.put(“AggregationMaxSize”, “256”);
>>
>> + producerConfig.put("AggregationEnabled”, “false”);
>>
>>
>>
>> This is very low, and could conflict with your chunk size. These
>> configurations are regarding the PutRecords request, which has a quota of
>> 500 records and 5MiB. You are setting the max size to 100kB, which is less
>> than your largest chunk. I would recommend removing these configurations.
>>
>> - producerConfig.put(“CollectionMaxCount”, “3”);
>> - producerConfig.put(“CollectionMaxSize”, “100000”);
>>
>>
>>
>> This is the default threading model, so can be removed.
>>
>> - producerConfig.put(“ThreadingModel”, “POOLED”);
>>
>>
>>
>> This config should not have too much impact. The default is 100ms, you
>> are increasing to 1s. This could increase your end-to-end latency under low
>> throughput scenarios.
>>
>> - producerConfig.put(“RecordMaxBufferedTime”, “1000");
>>
>>
>>
>> This config controls the sink backpressure and can also impact
>> throughput. Do you see any logs like "Waiting for the queue length to drop
>> below the limit takes unusually long, still not done after <x> attempts"?
>>
>> kinesis.setQueueLimit(1000);
>>
>>
>>
>> Thanks,
>>
>> Danny
>>
>>
>>
>> On Mon, May 16, 2022 at 5:27 PM Alexander Preuß <
>> alexanderpre...@ververica.com> wrote:
>>
>> Hi Zain,
>>
>>
>>
>> I'm looping in Danny here, he is probably the most knowledgeable when it
>> comes to the Kinesis connector.
>>
>>
>>
>> Best,
>>
>> Alexander
>>
>>
>>
>> On Mon, May 16, 2022 at 12:13 AM Zain Haider Nemati <
>> zain.hai...@retailo.co> wrote:
>>
>> Hi,
>>
>> Im fetching data from kafka topics converting them to chunks of <= 1MB
>> and sinking them to a kinesis data stream.
>>
>> The streaming job is functional however I see bursts of data in kinesis
>> stream with intermittent dips where data received is 0. I'm attaching the
>> configuration parameters for kinesis sink. What could be the cause of this
>> issue?
>>
>> The data is being fed into datastream by a kafka topic which is being fed
>> in by a mongodb and has about 60 million records which are loaded fully.
>>
>> I am trying to configure parameters in such a way that the 1MB per data
>> payload limit of kinesis is not breached. Would appreciate help on this !
>>
>>
>>
>> producerConfig.put(AWSConfigConstants.AWS_REGION, “xxx”);
>>
>> producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, “xxx");
>>
>> producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, “xxx”);
>>                     producerConfig.put(“AggregationMaxCount”, “3”);
>>                     producerConfig.put(“AggregationMaxSize”, “256”);
>>                     producerConfig.put(“CollectionMaxCount”, “3”);
>>                     producerConfig.put(“CollectionMaxSize”, “100000”);
>>                     producerConfig.put(“AggregationEnabled”, true);
>>                     producerConfig.put(“RateLimit”, “50");
>>                     producerConfig.put(“RecordMaxBufferedTime”, “1000");
>>                     producerConfig.put(“ThreadingModel”, “POOLED”);
>>                     FlinkKinesisProducer<String> kinesis = new
>> FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig);
>>                         kinesis.setFailOnError(false);
>>                         kinesis.setDefaultStream(“xxx”);
>>                         kinesis.setDefaultPartition(“0");
>>                         kinesis.setQueueLimit(1000);
>>
>>
>>
>> *Data in Kinesis :*
>>
>>
>>
>>
>> --
>>
>> *Alexander Preuß* | Engineer - Data Intensive Systems
>>
>> alexanderpre...@ververica.com
>>
>> [image: Image removed by sender.] <https://www.ververica.com/>
>>
>>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>>
>> Ververica GmbH
>>
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>
>> Managing Directors: Alexander Walden, Karl Anton Wehner, Yip Park Tung
>> Jason, Jinwei (Kevin) Zhang
>>
>>
>>
>>

Reply via email to