Hi Zain, Glad you found the problem, good luck!
Thanks, Danny Cranmer On Fri, May 20, 2022 at 10:05 PM Zain Haider Nemati <zain.hai...@retailo.co> wrote: > Hi Danny, > I looked into it in a bit more thorough detail, the bottleneck seems to be > the transform function which is at 100% and causing back pressuring. Im > looking into that. > Thanks for your help, much appreciated ! > > On Fri, May 20, 2022 at 1:24 AM Ber, Jeremy <jd...@amazon.com> wrote: > >> Hi Zain— >> >> >> >> Are you seeing any data loss present within the Flink Dashboard subtasks >> of each task? On the bottom of your dashboard you should see data going >> from each blue box to the next. Is this a comprehensive set of data? >> Meaning do you see 80M from the source -> first operator -> second operator >> -> sink? >> >> >> >> Secondly, it may be easier to troubleshoot this by removing a few >> variables. Would you be able to remove the operator which segregates your >> data into 100 length records and simply forward that data to the next >> operator? Simultaneously, could you leave the Kinesis Producer >> configuration settings (apart from queue limit) at their defaults? This >> will give a good baseline from which to improve upon. >> >> >> >> Jeremy >> >> >> >> *From: *Zain Haider Nemati <zain.hai...@retailo.co> >> *Date: *Wednesday, May 18, 2022 at 6:15 AM >> *To: *Danny Cranmer <dannycran...@apache.org> >> *Cc: *Alexander Preuß <alexanderpre...@ververica.com>, user < >> user@flink.apache.org> >> *Subject: *RE: [EXTERNAL]Kinesis Sink - Data being received with >> intermittent breaks >> >> >> >> *CAUTION*: This email originated from outside of the organization. Do >> not click links or open attachments unless you can confirm the sender and >> know the content is safe. >> >> >> >> Hey Danny, >> >> Thanks for getting back to me. >> >> - You are seeing bursty throughput, but the job is keeping up? There is >> no backpressure? --> Correct I'm not seeing any backpressure in any of the >> metrics >> >> - What is the throughput at the sink? --> num of records out -- 1100 per >> 10 seconds >> >> - On the graph screenshot, what is the period and stat (sum/average/etc)? >> -->It is incoming data (MB/s) each second >> >> >> >> So let me explain this in totality, the number of records residing in the >> source are about 80 million and the number of records i see in the kinesis >> data stream after it has consumed the data from source is about 20 million >> so im seeing alot of data loss and I think this potentially has to do with >> the intermediate dips im seeing in the records coming in the data stream. >> >> >> >> What are the configurations you guys generally suggest for data of this >> range and sinking to a kinesis data stream? >> >> >> >> On Wed, May 18, 2022 at 2:00 AM Danny Cranmer <dannycran...@apache.org> >> wrote: >> >> Hello Zain, >> >> >> >> Thanks for providing the additional information. Going back to the >> original issue: >> >> - You are seeing bursty throughput, but the job is keeping up? There is >> no backpressure? >> >> - What is the throughput at the sink? >> >> - On the graph screenshot, what is the period and stat (sum/average/etc)? >> >> >> >> Let me shed some light on the log messages, let's take this example: >> >> >> >> LogInputStreamReader ... Stage 1 Triggers ... { stream: >> 'flink-kafka-tracer', manual: 0, count: 0, size: 0, matches: 0, timed: 3, >> UserRecords: 6, KinesisRecords: 3 } >> >> >> >> Flush trigger reason: >> >> - manual: the flush was manually triggered >> >> - count: flush was triggered by the number of records in the container >> >> - size: the flush was triggered by the number of bytes in the container >> >> - matches: the predicate was matched >> >> - timed: the flush is triggered by elapsed timer >> >> >> >> Input/Output: >> >> - UserRecords: Number of input records KPL flushed (this can be higher >> than KinesisRecords when aggregation is enabled) >> >> - KinesisRecords: Number of records shipped to Kinesis Data Streams >> >> >> >> Stage 2 triggers tells us the number of API invocations via >> the PutRecords field. >> >> >> >> I can see from your logs that the majority of flushes are due to the >> timer, and it does not look overly bursty. Seems to sit at around 3 records >> per 15 seconds, or 1 record every 5 seconds. This seems very low, is it >> expected? >> >> >> >> Thanks, >> >> Danny Cranmer >> >> >> >> On Mon, May 16, 2022 at 10:57 PM Zain Haider Nemati < >> zain.hai...@retailo.co> wrote: >> >> Hey Danny, >> >> Thanks for having a look at the issue. >> >> I am using a custom flink operator to segregate the data into a >> consistent format of length 100 which is no more than 1 MB. The >> configurations I shared were after I was exploring tweaking some of them to >> see if it improves the throughput. >> >> >> >> Regarding your queries : >> >> - Which Flink version is this? -- > *Version 1.13* >> >> - Can you see any errors in the Flink logs? -->* No, Im attaching flink >> logs after I have set all the configurations to default* >> >> - Do you see any errors/throttles in the Kinesis Data Stream metrics? >> --> *I was before segregating into smaller chunks not anymore* >> >> - How many shards does your stream have? --> *It has 4 shards* >> >> - What is your sink operator parallelism? --> *1* >> >> - What is the general health of your job graph? --> *This is the only >> job running at the moment, it isn't unhealthy* >> >> - Are the operators upstream of the sink backpressured? --> *No* >> >> - Are you sure the sink is actually the issue here? --> * I have used >> the .print() as a sink and Im seeing all the records in real time it chokes >> when paired with sink* >> >> - Are there any other potential bottlenecks? --> *So data is coming in >> from source correctly, I have a flatmap transformation enabled which reads >> and segments it into chunks of <=1MB which is also tested using the >> .print() sink* >> >> - When you say you are trying to achieve "1MB chunks", I assume this is >> per Kinesis record, not per PutRecords batch? --> *Correct* >> >> >> >> Attaching a small chunk of the log file from when the job is started [It >> goes down to 0 records for some periods of time as well, in the log file it >> shows mostly between 3-6 records] >> >> >> >> Really appreciate your response on this, since I have not been able to >> gather much help from other resources online. Would be great if you can let >> me know what the issue here could be, let me know if you need to know >> anything else as well ! >> >> >> >> Cheers >> >> >> >> >> >> On Tue, May 17, 2022 at 12:34 AM Danny Cranmer <dannycran...@apache.org> >> wrote: >> >> Hello Zain, >> >> >> >> When you say "converting them to chunks of <= 1MB " does this mean you >> are creating these chunks in a custom Flink operator, or you are relying on >> the connector to do so? If you are generating your own chunks you can >> potentially disable Aggregation at the sink. >> >> >> >> Your throughput is incredibly bursty, I have a few questions: >> >> - Which Flink version is this? >> >> - Can you see any errors in the Flink logs? >> >> - Do you see any errors/throttles in the Kinesis Data Stream metrics? >> >> - How many shards does your stream have? >> >> - What is your sink operator parallelism? >> >> - What is the general health of your job graph? >> >> - Are the operators upstream of the sink backpressured? >> >> - Are you sure the sink is actually the issue here? >> >> - Are there any other potential bottlenecks? >> >> - When you say you are trying to achieve "1MB chunks", I assume this is >> per Kinesis record, not per PutRecords batch? >> >> >> >> Some comments on your configuration: >> >> >> >> As previously mentioned, if you are generating the chunks you can >> potentially remove the aggregation config and disable it. >> >> - producerConfig.put(“AggregationMaxCount”, “3”); >> - producerConfig.put(“AggregationMaxSize”, “256”); >> >> + producerConfig.put("AggregationEnabled”, “false”); >> >> >> >> This is very low, and could conflict with your chunk size. These >> configurations are regarding the PutRecords request, which has a quota of >> 500 records and 5MiB. You are setting the max size to 100kB, which is less >> than your largest chunk. I would recommend removing these configurations. >> >> - producerConfig.put(“CollectionMaxCount”, “3”); >> - producerConfig.put(“CollectionMaxSize”, “100000”); >> >> >> >> This is the default threading model, so can be removed. >> >> - producerConfig.put(“ThreadingModel”, “POOLED”); >> >> >> >> This config should not have too much impact. The default is 100ms, you >> are increasing to 1s. This could increase your end-to-end latency under low >> throughput scenarios. >> >> - producerConfig.put(“RecordMaxBufferedTime”, “1000"); >> >> >> >> This config controls the sink backpressure and can also impact >> throughput. Do you see any logs like "Waiting for the queue length to drop >> below the limit takes unusually long, still not done after <x> attempts"? >> >> kinesis.setQueueLimit(1000); >> >> >> >> Thanks, >> >> Danny >> >> >> >> On Mon, May 16, 2022 at 5:27 PM Alexander Preuß < >> alexanderpre...@ververica.com> wrote: >> >> Hi Zain, >> >> >> >> I'm looping in Danny here, he is probably the most knowledgeable when it >> comes to the Kinesis connector. >> >> >> >> Best, >> >> Alexander >> >> >> >> On Mon, May 16, 2022 at 12:13 AM Zain Haider Nemati < >> zain.hai...@retailo.co> wrote: >> >> Hi, >> >> Im fetching data from kafka topics converting them to chunks of <= 1MB >> and sinking them to a kinesis data stream. >> >> The streaming job is functional however I see bursts of data in kinesis >> stream with intermittent dips where data received is 0. I'm attaching the >> configuration parameters for kinesis sink. What could be the cause of this >> issue? >> >> The data is being fed into datastream by a kafka topic which is being fed >> in by a mongodb and has about 60 million records which are loaded fully. >> >> I am trying to configure parameters in such a way that the 1MB per data >> payload limit of kinesis is not breached. Would appreciate help on this ! >> >> >> >> producerConfig.put(AWSConfigConstants.AWS_REGION, “xxx”); >> >> producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, “xxx"); >> >> producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, “xxx”); >> producerConfig.put(“AggregationMaxCount”, “3”); >> producerConfig.put(“AggregationMaxSize”, “256”); >> producerConfig.put(“CollectionMaxCount”, “3”); >> producerConfig.put(“CollectionMaxSize”, “100000”); >> producerConfig.put(“AggregationEnabled”, true); >> producerConfig.put(“RateLimit”, “50"); >> producerConfig.put(“RecordMaxBufferedTime”, “1000"); >> producerConfig.put(“ThreadingModel”, “POOLED”); >> FlinkKinesisProducer<String> kinesis = new >> FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig); >> kinesis.setFailOnError(false); >> kinesis.setDefaultStream(“xxx”); >> kinesis.setDefaultPartition(“0"); >> kinesis.setQueueLimit(1000); >> >> >> >> *Data in Kinesis :* >> >> >> >> >> -- >> >> *Alexander Preuß* | Engineer - Data Intensive Systems >> >> alexanderpre...@ververica.com >> >> [image: Image removed by sender.] <https://www.ververica.com/> >> >> >> >> Follow us @VervericaData >> >> -- >> >> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >> Conference >> >> Stream Processing | Event Driven | Real Time >> >> -- >> >> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >> >> -- >> >> Ververica GmbH >> >> Registered at Amtsgericht Charlottenburg: HRB 158244 B >> >> Managing Directors: Alexander Walden, Karl Anton Wehner, Yip Park Tung >> Jason, Jinwei (Kevin) Zhang >> >> >> >>