Hi, Apologize for the big message, to explain the issue in detail.
We have a Flink (version 1.8) application running on AWS Kinesis Analytics. The application has a source which is a kafka topic with 15 partitions (AWS Managed Streaming Kafka) and the sink is again a kafka topic with 15 partitions. The size of each stream data is of 4 KB, so which would be 20K * 4 = ~ 79 MB The application performs some complex business logic with the data and produces the output to the kafka topic. As part of the performance test, the throughput we are getting for 20K (unique keys) concurrent stream data is 25 minutes. Our target is to achieve 20K concurrent stream data in 5 minutes. I have checked the code and did all the optimizations possible to the business logic code, but still don't see any improvement. Tried increasing the parallelism from 5 to 8 but its the same throughput with both 5 and 8 parallelism. I could also see the stream is distributed between all the 8 slots, though there is a lag of 2K between the first operator and the next consecutive operators. Checkpoint is enabled with default (Kinesis analytics) every one minute. Have also tried having different parallelism for each of the operators. Can you please suggest any other performance optimizations that need to be considered or if I am making any mistake here?. Here is my sample code ---------------------------------- StreamExecutionEnvironment streamenv = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<ObjectNode> initialStreamData = streamenv.addSource(new FlinkKafkaConsumer<>(TOPIC_NAME, new ObjectNodeJsonDeSerializerSchema(), kafkaConnectProperties); initialStreamData.print(); DataStream<POJO> rawDataProcess = initialStreamData.rebalance().flatMap(new ReProcessingDataProvider()).keyBy(value -> value.getPersonId()); rawDataProcess.print(); DataStream<POJO> cgmStream = rawDataProcess.keyBy(new ReProcessorKeySelector()).rebalance().flatMap(new SgStreamTask()); //the same person_id key cgmStream.print(); DataStream<POJO> artfctOverlapStream = null; artfctOverlapStream = cgmStreamData.keyBy(new CGMKeySelector()).countWindow(2, 1) .apply(new ArtifactOverlapProvider()); //the same person_id key cgmStreamData.print(); DataStream<POJO> streamWithSgRoc = null; streamWithSgRoc = artfctOverlapStream.keyBy(new CGMKeySelector()).countWindow(7, 1) .apply(new SgRocProvider()); // the same person_id key streamWithSgRoc.print(); DataStream<POJO> cgmExcursionStream = null; cgmExcursionStream = streamWithSgRoc.keyBy(new CGMKeySelector()) .countWindow(Common.THREE, Common.ONE).apply(new CGMExcursionProviderStream()); //the same person_id key cgmExcursionStream.print(); cgmExcursionStream.addSink(new FlinkKafkaProducer<CGMDataCollector>( topicsProperties.getProperty(Common.CGM_EVENT_TOPIC), new CGMDataCollectorSchema(), kafkaConnectProperties)); Thanks