Hi Zain—

Are you seeing any data loss present within the Flink Dashboard subtasks of 
each task? On the bottom of your dashboard you should see data going from each 
blue box to the next. Is this a comprehensive set of data? Meaning do you see 
80M from the source -> first operator -> second operator -> sink?

Secondly, it may be easier to troubleshoot this by removing a few variables. 
Would you be able to remove the operator which segregates your data into 100 
length records and simply forward that data to the next operator? 
Simultaneously, could you leave the Kinesis Producer configuration settings 
(apart from queue limit) at their defaults? This will give a good baseline from 
which to improve upon.

Jeremy

From: Zain Haider Nemati <zain.hai...@retailo.co>
Date: Wednesday, May 18, 2022 at 6:15 AM
To: Danny Cranmer <dannycran...@apache.org>
Cc: Alexander Preuß <alexanderpre...@ververica.com>, user 
<user@flink.apache.org>
Subject: RE: [EXTERNAL]Kinesis Sink - Data being received with intermittent 
breaks


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.


Hey Danny,
Thanks for getting back to me.
- You are seeing bursty throughput, but the job is keeping up? There is no 
backpressure? --> Correct I'm not seeing any backpressure in any of the metrics
- What is the throughput at the sink? --> num of records out -- 1100 per 10 
seconds
- On the graph screenshot, what is the period and stat (sum/average/etc)? -->It 
is incoming data (MB/s) each second

So let me explain this in totality, the number of records residing in the 
source are about 80 million and the number of records i see in the kinesis data 
stream after it has consumed the data from source is about 20 million so im 
seeing alot of data loss and I think this potentially has to do with the 
intermediate dips im seeing in the records coming in the data stream.

What are the configurations you guys generally suggest for data of this range 
and sinking to a kinesis data stream?

On Wed, May 18, 2022 at 2:00 AM Danny Cranmer 
<dannycran...@apache.org<mailto:dannycran...@apache.org>> wrote:
Hello Zain,

Thanks for providing the additional information. Going back to the original 
issue:
- You are seeing bursty throughput, but the job is keeping up? There is no 
backpressure?
- What is the throughput at the sink?
- On the graph screenshot, what is the period and stat (sum/average/etc)?

Let me shed some light on the log messages, let's take this example:

LogInputStreamReader ... Stage 1 Triggers ...  { stream: 'flink-kafka-tracer', 
manual: 0, count: 0, size: 0, matches: 0, timed: 3, UserRecords: 6, 
KinesisRecords: 3 }

Flush trigger reason:
- manual: the flush was manually triggered
- count: flush was triggered by the number of records in the container
- size: the flush was triggered by the number of bytes in the container
- matches: the predicate was matched
- timed: the flush is triggered by elapsed timer

Input/Output:
- UserRecords: Number of input records KPL flushed (this can be higher than 
KinesisRecords when aggregation is enabled)
- KinesisRecords: Number of records shipped to Kinesis Data Streams

Stage 2 triggers tells us the number of API invocations via the PutRecords 
field.

I can see from your logs that the majority of flushes are due to the timer, and 
it does not look overly bursty. Seems to sit at around 3 records per 15 
seconds, or 1 record every 5 seconds. This seems very low, is it expected?

Thanks,
Danny Cranmer

On Mon, May 16, 2022 at 10:57 PM Zain Haider Nemati 
<zain.hai...@retailo.co<mailto:zain.hai...@retailo.co>> wrote:
Hey Danny,
Thanks for having a look at the issue.
I am using a custom flink operator to segregate the data into a consistent 
format of length 100 which is no more than 1 MB. The configurations I shared 
were after I was exploring tweaking some of them to see if it improves the 
throughput.

Regarding your queries :
- Which Flink version is this? -- > Version 1.13
- Can you see any errors in the Flink logs?  --> No, Im attaching flink logs 
after I have set all the configurations to default
- Do you see any errors/throttles in the Kinesis Data Stream metrics?  --> I 
was before segregating into smaller chunks not anymore
- How many shards does your stream have? --> It has 4 shards
- What is your sink operator parallelism? --> 1
- What is the general health of your job graph? --> This is the only job 
running at the moment, it isn't unhealthy
  - Are the operators upstream of the sink backpressured? --> No
  - Are you sure the sink is actually the issue here? --> I have used the 
.print() as a sink and Im seeing all the records in real time it chokes when 
paired with sink
  - Are there any other potential bottlenecks? --> So data is coming in from 
source correctly, I have a flatmap transformation enabled which reads and 
segments it into chunks of <=1MB which is also tested using the .print() sink
- When you say you are trying to achieve "1MB chunks", I assume this is per 
Kinesis record, not per PutRecords batch? --> Correct

Attaching a small chunk of the log file from when the job is started [It goes 
down to 0 records for some periods of time as well, in the log file it shows 
mostly between 3-6 records]

Really appreciate your response on this, since I have not been able to gather 
much help from other resources online. Would be great if you can let me know 
what the issue here could be, let me know if you need to know anything else as 
well !

Cheers


On Tue, May 17, 2022 at 12:34 AM Danny Cranmer 
<dannycran...@apache.org<mailto:dannycran...@apache.org>> wrote:
Hello Zain,

When you say "converting them to chunks of <= 1MB " does this mean you are 
creating these chunks in a custom Flink operator, or you are relying on the 
connector to do so? If you are generating your own chunks you can potentially 
disable Aggregation at the sink.

Your throughput is incredibly bursty, I have a few questions:
- Which Flink version is this?
- Can you see any errors in the Flink logs?
- Do you see any errors/throttles in the Kinesis Data Stream metrics?
- How many shards does your stream have?
- What is your sink operator parallelism?
- What is the general health of your job graph?
  - Are the operators upstream of the sink backpressured?
  - Are you sure the sink is actually the issue here?
  - Are there any other potential bottlenecks?
- When you say you are trying to achieve "1MB chunks", I assume this is per 
Kinesis record, not per PutRecords batch?

Some comments on your configuration:

As previously mentioned, if you are generating the chunks you can potentially 
remove the aggregation config and disable it.
- producerConfig.put(“AggregationMaxCount”, “3”);
- producerConfig.put(“AggregationMaxSize”, “256”);
+ producerConfig.put("AggregationEnabled”, “false”);

This is very low, and could conflict with your chunk size. These configurations 
are regarding the PutRecords request, which has a quota of 500 records and 
5MiB. You are setting the max size to 100kB, which is less than your largest 
chunk. I would recommend removing these configurations.
- producerConfig.put(“CollectionMaxCount”, “3”);
- producerConfig.put(“CollectionMaxSize”, “100000”);

This is the default threading model, so can be removed.
- producerConfig.put(“ThreadingModel”, “POOLED”);

This config should not have too much impact. The default is 100ms, you are 
increasing to 1s. This could increase your end-to-end latency under low 
throughput scenarios.
- producerConfig.put(“RecordMaxBufferedTime”, “1000");

This config controls the sink backpressure and can also impact throughput. Do 
you see any logs like "Waiting for the queue length to drop below the limit 
takes unusually long, still not done after <x> attempts"?
kinesis.setQueueLimit(1000);

Thanks,
Danny

On Mon, May 16, 2022 at 5:27 PM Alexander Preuß 
<alexanderpre...@ververica.com<mailto:alexanderpre...@ververica.com>> wrote:
Hi Zain,

I'm looping in Danny here, he is probably the most knowledgeable when it comes 
to the Kinesis connector.

Best,
Alexander

On Mon, May 16, 2022 at 12:13 AM Zain Haider Nemati 
<zain.hai...@retailo.co<mailto:zain.hai...@retailo.co>> wrote:
Hi,
Im fetching data from kafka topics converting them to chunks of <= 1MB and 
sinking them to a kinesis data stream.
The streaming job is functional however I see bursts of data in kinesis stream 
with intermittent dips where data received is 0. I'm attaching the 
configuration parameters for kinesis sink. What could be the cause of this 
issue?
The data is being fed into datastream by a kafka topic which is being fed in by 
a mongodb and has about 60 million records which are loaded fully.
I am trying to configure parameters in such a way that the 1MB per data payload 
limit of kinesis is not breached. Would appreciate help on this !

producerConfig.put(AWSConfigConstants.AWS_REGION, “xxx”);
                    producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, 
“xxx");
                    
producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, “xxx”);
                    producerConfig.put(“AggregationMaxCount”, “3”);
                    producerConfig.put(“AggregationMaxSize”, “256”);
                    producerConfig.put(“CollectionMaxCount”, “3”);
                    producerConfig.put(“CollectionMaxSize”, “100000”);
                    producerConfig.put(“AggregationEnabled”, true);
                    producerConfig.put(“RateLimit”, “50");
                    producerConfig.put(“RecordMaxBufferedTime”, “1000");
                    producerConfig.put(“ThreadingModel”, “POOLED”);
                    FlinkKinesisProducer<String> kinesis = new 
FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig);
                        kinesis.setFailOnError(false);
                        kinesis.setDefaultStream(“xxx”);
                        kinesis.setDefaultPartition(“0");
                        kinesis.setQueueLimit(1000);

Data in Kinesis :
[cid:image001.png@01D86B94.90EC5990]


--

Alexander Preuß | Engineer - Data Intensive Systems

alexanderpre...@ververica.com<mailto:alexanderpre...@ververica.com>

[Image removed by sender.]<https://www.ververica.com/>



Follow us @VervericaData

--

Join Flink Forward<https://flink-forward.org/> - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH

Registered at Amtsgericht Charlottenburg: HRB 158244 B

Managing Directors: Alexander Walden, Karl Anton Wehner, Yip Park Tung Jason, 
Jinwei (Kevin) Zhang


Reply via email to