Re: Flink Performance Issue
Hi Kamaal, I did a quick test with a local Kafka in docker. With parallelism 1, I can process 20k messages of size 4KB in about 1 min. So if you use parallelism of 15, I'd expect it to take it below 10s even with bigger data skew. What I recommend you to do is to start from scratch and just work with a simple source -> sink. That should be much much faster. If so, then you can add complexity until you find the bottleneck. If not, I suspect your ObjectNodeJsonDeSerializerSchema to be the issue. For example, are you creating an ObjectMapper with each invocation? That's a typical mistake. Best, Arvid On Mon, Sep 27, 2021 at 2:38 PM Mohammed Kamaal wrote: > Hi Robert, > > I have removed all the business logic (keyBy and window) operator code and > just had a source and sink to test it. > The throughput is 20K messages in 2 minutes. It is a simple read from > source (kafka topic) and write to sink (kafka topic). Don't you think 2 > minutes is also not a better throughput for a simple read/write > application?. Each message is 4 KB. > > As I had mentioned in the previous email(s), I am using keyBy() and > Window() to handle business logic. Do you think these operators would have > a huge impact on the performance?. Or is it something to do with my Kafka > cluster configuration or the older version of flink (1.8) that I am using > in my application. Not sure if flink version 1.8 has a performance issue. > > Please let me know. > Below is my kafka cluster configuration. > > auto.create.topics.enable=true > log.retention.hours=24 > default.replication.factor=3 > min.insync.replicas=2 > num.io.threads=45 > num.network.threads=60 > num.partitions=45 > num.replica.fetchers=2 > unclean.leader.election.enable=true > replica.lag.time.max.ms=3 > zookeeper.session.timeout.ms=18000 > log.retention.ms=17280 > log.cleanup.policy=delete > group.max.session.timeout.ms=120 > > > > Thanks > > On Wed, Sep 22, 2021 at 9:06 PM Robert Metzger > wrote: > >> Hi Kamaal, >> >> I would first suggest understanding the performance bottleneck, before >> applying any optimizations. >> >> Idea 1: Are your CPUs fully utilized? >> if yes, good, then scaling up will probably help >> If not, then there's another inefficiency >> >> Idea 2: How fast can you get the data into your job, without any >> processing? >> You can measure this by submitting a simple Flink job that just reads the >> data and writes it to a discarding sink. Either disable the operator >> chaining to get metrics for the records per second, or add a custom mapper >> in between that measures the throughput. >> Ideally you see here that you can read all your data in a few seconds, if >> not, then there's a problem getting your data in. >> >> Idea 3: is your IO fully utilized ?( if you are checkpointing to RocksDB, >> the disk can dramatically slow you down) >> Idea 4: Are you under high memory pressure, and your JVMs are spending >> most of their cycles garbage collecting? >> >> My bet is you are not getting data into your cluster as fast as you think >> (Idea 2) >> >> >> On Wed, Sep 22, 2021 at 12:05 PM Mohammed Kamaal < >> mohammed.kamaa...@gmail.com> wrote: >> >>> Hi Arvid, >>> >>> The throughput has decreased further after I removed all the >>> rebalance(). The performance has decreased from 14 minutes for 20K messages >>> to 20 minutes for 20K messages. >>> >>> Below are the tasks that the flink application is performing. I am using >>> keyBy and Window operation. Do you think am I making any mistake here or >>> the way I am performing the keyBy or Window operation needs to be >>> corrected?. >>> >>> //Add Source >>> StreamExecutionEnvironment streamenv = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> initialStreamData = streamenv.addSource(new >>> FlinkKafkaConsumer<>(topicsProperties.getProperty(Common.CGM_STREAM_TOPIC), >>> new *ObjectNodeJsonDeSerializerSchema()*, >>> kafkaConnectProperties)).setParallelism(Common.FORTY_FIVE); >>> >>> DataStream cgmStreamData = initialStreamData.keyBy(value -> >>> value.findValue("PERSON_ID").asText()) >>> .flatMap(new *SgStreamingTask()*).setParallelism(Common.FORTY_FIVE); >>> >>> DataStream artfctOverlapStream = cgmStreamData.keyBy(new >>> CGMKeySelector()).countWindow(2, 1) >>> .apply(new *ArtifactOverlapProvider()* >>> ).setParallelism(Common.FORTY_FIVE).rebalance(); >>> >>> DataStream streamWithSgRoc = artfctOverlapStream.keyBy(new >>> CGMKeySelector()).countWindow(7, 1) >>> .apply(new *SgRocProvider()* >>> ).setParallelism(Common.FORTY_FIVE).rebalance(); >>> >>> DataStream cgmExcursionStream = >>> streamWithSgRoc.keyBy(new CGMKeySelector()) >>> .countWindow(Common.THREE, Common.ONE).apply(new >>> *CGMExcursionProviderStream()* >>> ).setParallelism(Common.FORTY_FIVE).rebalance(); >>> >>> //Add Sink >>> cgmExcursionStream.addSink(new FlinkKafkaProducer( >>> topicsProperties.getProperty(Common.CGM_EVENT_TOPIC), new >>> CGMDataCollectorSchema(), >>>
Re: Flink Performance Issue
Hi Robert, I have removed all the business logic (keyBy and window) operator code and just had a source and sink to test it. The throughput is 20K messages in 2 minutes. It is a simple read from source (kafka topic) and write to sink (kafka topic). Don't you think 2 minutes is also not a better throughput for a simple read/write application?. Each message is 4 KB. As I had mentioned in the previous email(s), I am using keyBy() and Window() to handle business logic. Do you think these operators would have a huge impact on the performance?. Or is it something to do with my Kafka cluster configuration or the older version of flink (1.8) that I am using in my application. Not sure if flink version 1.8 has a performance issue. Please let me know. Below is my kafka cluster configuration. auto.create.topics.enable=true log.retention.hours=24 default.replication.factor=3 min.insync.replicas=2 num.io.threads=45 num.network.threads=60 num.partitions=45 num.replica.fetchers=2 unclean.leader.election.enable=true replica.lag.time.max.ms=3 zookeeper.session.timeout.ms=18000 log.retention.ms=17280 log.cleanup.policy=delete group.max.session.timeout.ms=120 Thanks > On Wed, Sep 22, 2021 at 9:06 PM Robert Metzger wrote: > Hi Kamaal, > > I would first suggest understanding the performance bottleneck, before > applying any optimizations. > > Idea 1: Are your CPUs fully utilized? > if yes, good, then scaling up will probably help > If not, then there's another inefficiency > > Idea 2: How fast can you get the data into your job, without any processing? > You can measure this by submitting a simple Flink job that just reads the > data and writes it to a discarding sink. Either disable the operator chaining > to get metrics for the records per second, or add a custom mapper in between > that measures the throughput. > Ideally you see here that you can read all your data in a few seconds, if > not, then there's a problem getting your data in. > > Idea 3: is your IO fully utilized ?( if you are checkpointing to RocksDB, the > disk can dramatically slow you down) > Idea 4: Are you under high memory pressure, and your JVMs are spending most > of their cycles garbage collecting? > > My bet is you are not getting data into your cluster as fast as you think > (Idea 2) > > >> On Wed, Sep 22, 2021 at 12:05 PM Mohammed Kamaal >> wrote: >> Hi Arvid, >> >> The throughput has decreased further after I removed all the rebalance(). >> The performance has decreased from 14 minutes for 20K messages to 20 minutes >> for 20K messages. >> >> Below are the tasks that the flink application is performing. I am using >> keyBy and Window operation. Do you think am I making any mistake here or the >> way I am performing the keyBy or Window operation needs to be corrected?. >> >> //Add Source >> StreamExecutionEnvironment streamenv = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> initialStreamData = streamenv.addSource(new >> FlinkKafkaConsumer<>(topicsProperties.getProperty(Common.CGM_STREAM_TOPIC), >> new ObjectNodeJsonDeSerializerSchema(), >> kafkaConnectProperties)).setParallelism(Common.FORTY_FIVE); >> >> DataStream cgmStreamData = initialStreamData.keyBy(value -> >> value.findValue("PERSON_ID").asText()) >> .flatMap(new SgStreamingTask()).setParallelism(Common.FORTY_FIVE); >> >> DataStream artfctOverlapStream = cgmStreamData.keyBy(new >> CGMKeySelector()).countWindow(2, 1) >> .apply(new >> ArtifactOverlapProvider()).setParallelism(Common.FORTY_FIVE).rebalance(); >> >> DataStream streamWithSgRoc = artfctOverlapStream.keyBy(new >> CGMKeySelector()).countWindow(7, 1) >> .apply(new SgRocProvider()).setParallelism(Common.FORTY_FIVE).rebalance(); >> >> DataStream cgmExcursionStream = streamWithSgRoc.keyBy(new >> CGMKeySelector()) >> .countWindow(Common.THREE, Common.ONE).apply(new >> CGMExcursionProviderStream()).setParallelism(Common.FORTY_FIVE).rebalance(); >> >> //Add Sink >> cgmExcursionStream.addSink(new FlinkKafkaProducer( >> topicsProperties.getProperty(Common.CGM_EVENT_TOPIC), new >> CGMDataCollectorSchema(), >> kafkaConnectProperties)).setParallelism(Common.FORTY_FIVE); >> >> Implementation classes:- >> >> //deserialize the json message received >> ObjectNodeJsonDeSerializerSchema implements >> KeyedDeserializationSchema{ >> public ObjectNode deserialize(byte[] messageKey, byte[] message, String >> topic, int partition, long offset); >> } >> >> //Flapmap to check each message and apply validation >> public class SgStreamingTask extends RichFlatMapFunction { >> void flatMap(ObjectNode streamData, Collector out); >> } >> >> //persist three state variables and apply business logic >> public class ArtifactOverlapProvider extends RichFlatMapFunction> Tuple2> >> implements WindowFunction { >> public void apply(String key, GlobalWindow window, Iterable values, >> Collector out); >> } >> >> //Apply business logic >> public class SgRocProvider implements
Re: Flink Performance Issue
Hi Kamaal, I would first suggest understanding the performance bottleneck, before applying any optimizations. Idea 1: Are your CPUs fully utilized? if yes, good, then scaling up will probably help If not, then there's another inefficiency Idea 2: How fast can you get the data into your job, without any processing? You can measure this by submitting a simple Flink job that just reads the data and writes it to a discarding sink. Either disable the operator chaining to get metrics for the records per second, or add a custom mapper in between that measures the throughput. Ideally you see here that you can read all your data in a few seconds, if not, then there's a problem getting your data in. Idea 3: is your IO fully utilized ?( if you are checkpointing to RocksDB, the disk can dramatically slow you down) Idea 4: Are you under high memory pressure, and your JVMs are spending most of their cycles garbage collecting? My bet is you are not getting data into your cluster as fast as you think (Idea 2) On Wed, Sep 22, 2021 at 12:05 PM Mohammed Kamaal < mohammed.kamaa...@gmail.com> wrote: > Hi Arvid, > > The throughput has decreased further after I removed all the rebalance(). > The performance has decreased from 14 minutes for 20K messages to 20 > minutes for 20K messages. > > Below are the tasks that the flink application is performing. I am using > keyBy and Window operation. Do you think am I making any mistake here or > the way I am performing the keyBy or Window operation needs to be > corrected?. > > //Add Source > StreamExecutionEnvironment streamenv = > StreamExecutionEnvironment.getExecutionEnvironment(); > initialStreamData = streamenv.addSource(new > FlinkKafkaConsumer<>(topicsProperties.getProperty(Common.CGM_STREAM_TOPIC), > new *ObjectNodeJsonDeSerializerSchema()*, > kafkaConnectProperties)).setParallelism(Common.FORTY_FIVE); > > DataStream cgmStreamData = initialStreamData.keyBy(value -> > value.findValue("PERSON_ID").asText()) > .flatMap(new *SgStreamingTask()*).setParallelism(Common.FORTY_FIVE); > > DataStream artfctOverlapStream = cgmStreamData.keyBy(new > CGMKeySelector()).countWindow(2, 1) > .apply(new *ArtifactOverlapProvider()* > ).setParallelism(Common.FORTY_FIVE).rebalance(); > > DataStream streamWithSgRoc = artfctOverlapStream.keyBy(new > CGMKeySelector()).countWindow(7, 1) > .apply(new *SgRocProvider()* > ).setParallelism(Common.FORTY_FIVE).rebalance(); > > DataStream cgmExcursionStream = > streamWithSgRoc.keyBy(new CGMKeySelector()) > .countWindow(Common.THREE, Common.ONE).apply(new > *CGMExcursionProviderStream()* > ).setParallelism(Common.FORTY_FIVE).rebalance(); > > //Add Sink > cgmExcursionStream.addSink(new FlinkKafkaProducer( > topicsProperties.getProperty(Common.CGM_EVENT_TOPIC), new > CGMDataCollectorSchema(), > kafkaConnectProperties)).setParallelism(Common.FORTY_FIVE); > > *Implementation classes:-* > > //deserialize the json message received > *ObjectNodeJsonDeSerializerSchema* implements > KeyedDeserializationSchema{ > public ObjectNode deserialize(byte[] messageKey, byte[] message, String > topic, int partition, long offset); > } > > //Flapmap to check each message and apply validation > public class *SgStreamingTask* extends RichFlatMapFunction CGM> { > void flatMap(ObjectNode streamData, Collector out); > } > > //persist three state variables and apply business logic > public class *ArtifactOverlapProvider* extends RichFlatMapFunction Tuple2> > implements WindowFunction { > public void apply(String key, GlobalWindow window, Iterable values, > Collector out); > } > > //Apply business logic > public class *SgRocProvider* implements WindowFunction GlobalWindow>{ > public void apply(String key, GlobalWindow window, Iterable values, > Collector out); > } > > //persist 3 state variables and apply business logic > public class *CGMExcursionProviderStream* extends > RichFlatMapFunction> > implements WindowFunction{ > public void apply(String key, GlobalWindow window, Iterable values, > Collector out); > > } > > Thanks > Kamaal > > > On Mon, Sep 6, 2021 at 9:57 PM Arvid Heise wrote: > >> Hi Mohammed, >> >> something is definitely wrong in your setup. You can safely say that you >> can process 1k records per second and core with Kafka and light processing, >> so you shouldn't even need to go distributed in your case. >> >> Do you perform any heavy computation? What is your flatMap doing? Are you >> emitting lots of small records from one big record? >> >> Can you please remove all rebalance and report back? Rebalance is >> counter-productive if you don't exactly know that you need it. >> >> On Thu, Sep 2, 2021 at 1:36 PM Mohammed Kamaal < >> mohammed.kamaa...@gmail.com> wrote: >> >>> Hi Fabian, >>> >>> Just an update, >>> >>> Problem 2:- >>> >>> Caused by: org.apache.kafka.common.errors.NetworkException >>> It is resolved. It was because we exceeded the number of allowed >>> partitions for the kafka cluster (AWS MSK cluster). Have deleted >>> unused topics
Re: Flink Performance Issue
Hi Arvid, The throughput has decreased further after I removed all the rebalance(). The performance has decreased from 14 minutes for 20K messages to 20 minutes for 20K messages. Below are the tasks that the flink application is performing. I am using keyBy and Window operation. Do you think am I making any mistake here or the way I am performing the keyBy or Window operation needs to be corrected?. //Add Source StreamExecutionEnvironment streamenv = StreamExecutionEnvironment.getExecutionEnvironment(); initialStreamData = streamenv.addSource(new FlinkKafkaConsumer<>(topicsProperties.getProperty(Common.CGM_STREAM_TOPIC), new ObjectNodeJsonDeSerializerSchema(), kafkaConnectProperties)).setParallelism(Common.FORTY_FIVE); DataStream cgmStreamData = initialStreamData.keyBy(value -> value.findValue("PERSON_ID").asText()) .flatMap(new SgStreamingTask()).setParallelism(Common.FORTY_FIVE); DataStream artfctOverlapStream = cgmStreamData.keyBy(new CGMKeySelector()).countWindow(2, 1) .apply(new ArtifactOverlapProvider()).setParallelism(Common.FORTY_FIVE).rebalance(); DataStream streamWithSgRoc = artfctOverlapStream.keyBy(new CGMKeySelector()).countWindow(7, 1) .apply(new SgRocProvider()).setParallelism(Common.FORTY_FIVE).rebalance(); DataStream cgmExcursionStream = streamWithSgRoc.keyBy(new CGMKeySelector()) .countWindow(Common.THREE, Common.ONE).apply(new CGMExcursionProviderStream()).setParallelism(Common.FORTY_FIVE).rebalance(); //Add Sink cgmExcursionStream.addSink(new FlinkKafkaProducer( topicsProperties.getProperty(Common.CGM_EVENT_TOPIC), new CGMDataCollectorSchema(), kafkaConnectProperties)).setParallelism(Common.FORTY_FIVE); Implementation classes:- //deserialize the json message received ObjectNodeJsonDeSerializerSchema implements KeyedDeserializationSchema{ public ObjectNode deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset); } //Flapmap to check each message and apply validation public class SgStreamingTask extends RichFlatMapFunction { void flatMap(ObjectNode streamData, Collector out); } //persist three state variables and apply business logic public class ArtifactOverlapProvider extends RichFlatMapFunction> implements WindowFunction { public void apply(String key, GlobalWindow window, Iterable values, Collector out); } //Apply business logic public class SgRocProvider implements WindowFunction{ public void apply(String key, GlobalWindow window, Iterable values, Collector out); } //persist 3 state variables and apply business logic public class CGMExcursionProviderStream extends RichFlatMapFunction> implements WindowFunction{ public void apply(String key, GlobalWindow window, Iterable values, Collector out); } Thanks Kamaal > On Mon, Sep 6, 2021 at 9:57 PM Arvid Heise wrote: > Hi Mohammed, > > something is definitely wrong in your setup. You can safely say that you can > process 1k records per second and core with Kafka and light processing, so > you shouldn't even need to go distributed in your case. > > Do you perform any heavy computation? What is your flatMap doing? Are you > emitting lots of small records from one big record? > > Can you please remove all rebalance and report back? Rebalance is > counter-productive if you don't exactly know that you need it. > >> On Thu, Sep 2, 2021 at 1:36 PM Mohammed Kamaal >> wrote: >> Hi Fabian, >> >> Just an update, >> >> Problem 2:- >> >> Caused by: org.apache.kafka.common.errors.NetworkException >> It is resolved. It was because we exceeded the number of allowed >> partitions for the kafka cluster (AWS MSK cluster). Have deleted >> unused topics and partitions to resolve the issue. >> >> Problem 1:- >> >> I increased the kafka partition and flink parallelism to 45 and the >> throughput has improved from 20 minutes to 14 minutes (20K records). >> Can you check the flink graph and let me know if there is anything >> else that can be done here to improve the throughput further. >> >> Thanks >> >> On Wed, Sep 1, 2021 at 10:55 PM Mohammed Kamaal >> wrote: >> > >> > Hi Fabian, >> > >> > Problem 1:- >> > - >> > I have removed the print out sink's and ran the test again. This time >> > the throughput is 17 minutes for 20K records (200 records every >> > second). Earlier it was 20 minutes for 20K records. (parallelism 15 >> > and kafka partition of 15) >> > >> > Please find the attached application graph. Can you suggest what else >> > is required further to improve the throughput. >> > >> > Problem 2:- >> > - >> > Also, I tried to increase the parallelism to 45 from 15 (also >> > increasing the kafka partition to 45 from 15) to see if this helps in >> > getting a better throughput. >> > >> > After increasing the partition, I am facing the Network issue with >> > Kafka Cluster (AWS Managed Stream Kafka). I am not getting this issue >> > with 15 partitions for the kafka topic. This could be an issue with
Re: Flink Performance Issue
Hi Mohammed, something is definitely wrong in your setup. You can safely say that you can process 1k records per second and core with Kafka and light processing, so you shouldn't even need to go distributed in your case. Do you perform any heavy computation? What is your flatMap doing? Are you emitting lots of small records from one big record? Can you please remove all rebalance and report back? Rebalance is counter-productive if you don't exactly know that you need it. On Thu, Sep 2, 2021 at 1:36 PM Mohammed Kamaal wrote: > Hi Fabian, > > Just an update, > > Problem 2:- > > Caused by: org.apache.kafka.common.errors.NetworkException > It is resolved. It was because we exceeded the number of allowed > partitions for the kafka cluster (AWS MSK cluster). Have deleted > unused topics and partitions to resolve the issue. > > Problem 1:- > > I increased the kafka partition and flink parallelism to 45 and the > throughput has improved from 20 minutes to 14 minutes (20K records). > Can you check the flink graph and let me know if there is anything > else that can be done here to improve the throughput further. > > Thanks > > On Wed, Sep 1, 2021 at 10:55 PM Mohammed Kamaal > wrote: > > > > Hi Fabian, > > > > Problem 1:- > > - > > I have removed the print out sink's and ran the test again. This time > > the throughput is 17 minutes for 20K records (200 records every > > second). Earlier it was 20 minutes for 20K records. (parallelism 15 > > and kafka partition of 15) > > > > Please find the attached application graph. Can you suggest what else > > is required further to improve the throughput. > > > > Problem 2:- > > - > > Also, I tried to increase the parallelism to 45 from 15 (also > > increasing the kafka partition to 45 from 15) to see if this helps in > > getting a better throughput. > > > > After increasing the partition, I am facing the Network issue with > > Kafka Cluster (AWS Managed Stream Kafka). I am not getting this issue > > with 15 partitions for the kafka topic. This could be an issue with > > the Kafka cluster? > > > > Kafka Cluster Configuration:- > > --- > > auto.create.topics.enable=true > > log.retention.hours=24 > > default.replication.factor=3 > > min.insync.replicas=2 > > num.io.threads=45 > > num.network.threads=60 > > num.partitions=45 > > num.replica.fetchers=2 > > unclean.leader.election.enable=true > > replica.lag.time.max.ms=3 > > zookeeper.session.timeout.ms=18000 > > log.retention.ms=17280 > > log.cleanup.policy=delete > > group.max.session.timeout.ms=120 > > > > Exception:- > > > > "locationInformation": > > > "org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:500)", > > "logger": "org.apache.flink.streaming.runtime.tasks.StreamTask", > > "message": "Error during disposal of stream operator.", > > "throwableInformation": [ > > "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: > > Failed to send data to Kafka: Failed to send data to Kafka: The server > > disconnected > > > > "Caused by: org.apache.kafka.common.errors.NetworkException: The > > server disconnected before a response was received." > > > > > > Thanks > > > > > > On Wed, Aug 25, 2021 at 12:11 PM Fabian Paul > wrote: > > > > > > Hi Mohammed, > > > > > > 200records should definitely be doable. The first you can do is remove > the print out Sink because they are increasing the load on your cluster due > to the additional IO > > > operation and secondly preventing Flink from fusing operators. > > > I am interested to see the updated job graph after the removal of the > print sinks. > > > > > > Best, > > > Fabian >
Re: Flink Performance Issue
Hi Fabian, Just an update, Problem 2:- Caused by: org.apache.kafka.common.errors.NetworkException It is resolved. It was because we exceeded the number of allowed partitions for the kafka cluster (AWS MSK cluster). Have deleted unused topics and partitions to resolve the issue. Problem 1:- I increased the kafka partition and flink parallelism to 45 and the throughput has improved from 20 minutes to 14 minutes (20K records). Can you check the flink graph and let me know if there is anything else that can be done here to improve the throughput further. Thanks On Wed, Sep 1, 2021 at 10:55 PM Mohammed Kamaal wrote: > > Hi Fabian, > > Problem 1:- > - > I have removed the print out sink's and ran the test again. This time > the throughput is 17 minutes for 20K records (200 records every > second). Earlier it was 20 minutes for 20K records. (parallelism 15 > and kafka partition of 15) > > Please find the attached application graph. Can you suggest what else > is required further to improve the throughput. > > Problem 2:- > - > Also, I tried to increase the parallelism to 45 from 15 (also > increasing the kafka partition to 45 from 15) to see if this helps in > getting a better throughput. > > After increasing the partition, I am facing the Network issue with > Kafka Cluster (AWS Managed Stream Kafka). I am not getting this issue > with 15 partitions for the kafka topic. This could be an issue with > the Kafka cluster? > > Kafka Cluster Configuration:- > --- > auto.create.topics.enable=true > log.retention.hours=24 > default.replication.factor=3 > min.insync.replicas=2 > num.io.threads=45 > num.network.threads=60 > num.partitions=45 > num.replica.fetchers=2 > unclean.leader.election.enable=true > replica.lag.time.max.ms=3 > zookeeper.session.timeout.ms=18000 > log.retention.ms=17280 > log.cleanup.policy=delete > group.max.session.timeout.ms=120 > > Exception:- > > "locationInformation": > "org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:500)", > "logger": "org.apache.flink.streaming.runtime.tasks.StreamTask", > "message": "Error during disposal of stream operator.", > "throwableInformation": [ > "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: > Failed to send data to Kafka: Failed to send data to Kafka: The server > disconnected > > "Caused by: org.apache.kafka.common.errors.NetworkException: The > server disconnected before a response was received." > > > Thanks > > > On Wed, Aug 25, 2021 at 12:11 PM Fabian Paul wrote: > > > > Hi Mohammed, > > > > 200records should definitely be doable. The first you can do is remove the > > print out Sink because they are increasing the load on your cluster due to > > the additional IO > > operation and secondly preventing Flink from fusing operators. > > I am interested to see the updated job graph after the removal of the print > > sinks. > > > > Best, > > Fabian
Re: Flink Performance Issue
Hi Mohammed, 200records should definitely be doable. The first you can do is remove the print out Sink because they are increasing the load on your cluster due to the additional IO operation and secondly preventing Flink from fusing operators. I am interested to see the updated job graph after the removal of the print sinks. Best, Fabian
Re: Flink Performance Issue
Hi Mohammed, Without diving too much into your business logic a thing which catches my eye is the partitiong you are using. In general all calls to`keyBy`or `rebalance` are very expensive because all the data is shuffled across down- stream tasks. Flink tries to fuse operators with the same keyGroups together that there is no communication overhead between them but this is not possible if a shuffle is between them One example would be your cgmStream which first is distributed by a specified key and rebalance right after it. When applying `keyBy` operation it is important to understand how the key distribution in your input data looks like. It may happen that specific keys occur very very and some others appear with a less likelihood this also can cause a skew in your pipeline which cannot be resolved with a higher parallelism (some tasks are overloaded, some are idle). I also have a couple of followup questions to better understand your setup - What do you mean with 20k concurrent stream data, 20k records per second? - How many taskmanagers are you using and how are the slots distributed? - Can you check the Flink WebUI if some operators are idle and maybe share the image of the job graph? - How did you notice the lag of 2k between the operators? Best, Fabian
Flink Performance Issue
Hi, Apologize for the big message, to explain the issue in detail. We have a Flink (version 1.8) application running on AWS Kinesis Analytics. The application has a source which is a kafka topic with 15 partitions (AWS Managed Streaming Kafka) and the sink is again a kafka topic with 15 partitions. The size of each stream data is of 4 KB, so which would be 20K * 4 = ~ 79 MB The application performs some complex business logic with the data and produces the output to the kafka topic. As part of the performance test, the throughput we are getting for 20K (unique keys) concurrent stream data is 25 minutes. Our target is to achieve 20K concurrent stream data in 5 minutes. I have checked the code and did all the optimizations possible to the business logic code, but still don't see any improvement. Tried increasing the parallelism from 5 to 8 but its the same throughput with both 5 and 8 parallelism. I could also see the stream is distributed between all the 8 slots, though there is a lag of 2K between the first operator and the next consecutive operators. Checkpoint is enabled with default (Kinesis analytics) every one minute. Have also tried having different parallelism for each of the operators. Can you please suggest any other performance optimizations that need to be considered or if I am making any mistake here?. Here is my sample code -- StreamExecutionEnvironment streamenv = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream initialStreamData = streamenv.addSource(new FlinkKafkaConsumer<>(TOPIC_NAME, new ObjectNodeJsonDeSerializerSchema(), kafkaConnectProperties); initialStreamData.print(); DataStream rawDataProcess = initialStreamData.rebalance().flatMap(new ReProcessingDataProvider()).keyBy(value -> value.getPersonId()); rawDataProcess.print(); DataStream cgmStream = rawDataProcess.keyBy(new ReProcessorKeySelector()).rebalance().flatMap(new SgStreamTask()); //the same person_id key cgmStream.print(); DataStream artfctOverlapStream = null; artfctOverlapStream = cgmStreamData.keyBy(new CGMKeySelector()).countWindow(2, 1) .apply(new ArtifactOverlapProvider()); //the same person_id key cgmStreamData.print(); DataStream streamWithSgRoc = null; streamWithSgRoc = artfctOverlapStream.keyBy(new CGMKeySelector()).countWindow(7, 1) .apply(new SgRocProvider()); // the same person_id key streamWithSgRoc.print(); DataStream cgmExcursionStream = null; cgmExcursionStream = streamWithSgRoc.keyBy(new CGMKeySelector()) .countWindow(Common.THREE, Common.ONE).apply(new CGMExcursionProviderStream()); //the same person_id key cgmExcursionStream.print(); cgmExcursionStream.addSink(new FlinkKafkaProducer( topicsProperties.getProperty(Common.CGM_EVENT_TOPIC), new CGMDataCollectorSchema(), kafkaConnectProperties)); Thanks