Hi,

Apologize for the big message, to explain the issue in detail.

We have a Flink (version 1.8) application running on AWS Kinesis Analytics. The 
application has a source which is a kafka topic with 15 partitions (AWS Managed 
Streaming Kafka) and the sink is again a kafka topic with 15 partitions.

The size of each stream data is of 4 KB, so which would be 20K * 4 = ~ 79 MB

The application performs some complex business logic with the data and produces 
the output to the kafka topic.

As part of the performance test, the throughput we are getting for 20K (unique 
keys) concurrent stream data is 25 minutes.

Our target is to achieve 20K concurrent stream data in 5 minutes.

I have checked the code and did all the optimizations possible to the business 
logic code, but still don't see any improvement.

Tried increasing the parallelism from 5 to 8 but its the same throughput with 
both 5 and 8 parallelism.

I could also see the stream is distributed between all the 8 slots, though 
there is a lag of 2K between the first operator and the next consecutive 
operators.

Checkpoint is enabled with default (Kinesis analytics) every one minute.

Have also tried having different parallelism for each of the operators.

Can you please suggest any other performance optimizations that need to be 
considered or if I am making any mistake here?.

Here is my sample code
----------------------------------

StreamExecutionEnvironment streamenv = 
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<ObjectNode> initialStreamData = streamenv.addSource(new 
FlinkKafkaConsumer<>(TOPIC_NAME, new ObjectNodeJsonDeSerializerSchema(), 
kafkaConnectProperties);
initialStreamData.print();

DataStream<POJO> rawDataProcess = initialStreamData.rebalance().flatMap(new 
ReProcessingDataProvider()).keyBy(value -> value.getPersonId());
rawDataProcess.print();

DataStream<POJO> cgmStream = rawDataProcess.keyBy(new 
ReProcessorKeySelector()).rebalance().flatMap(new SgStreamTask()); //the same 
person_id key
cgmStream.print();

DataStream<POJO> artfctOverlapStream = null;
artfctOverlapStream = cgmStreamData.keyBy(new CGMKeySelector()).countWindow(2, 
1)
.apply(new ArtifactOverlapProvider()); //the same person_id key
cgmStreamData.print();

DataStream<POJO> streamWithSgRoc = null;
streamWithSgRoc = artfctOverlapStream.keyBy(new 
CGMKeySelector()).countWindow(7, 1)
.apply(new SgRocProvider()); // the same person_id key
streamWithSgRoc.print();

DataStream<POJO> cgmExcursionStream = null;
cgmExcursionStream = streamWithSgRoc.keyBy(new CGMKeySelector())
.countWindow(Common.THREE, Common.ONE).apply(new CGMExcursionProviderStream()); 
//the same person_id key
cgmExcursionStream.print();

cgmExcursionStream.addSink(new FlinkKafkaProducer<CGMDataCollector>(
topicsProperties.getProperty(Common.CGM_EVENT_TOPIC), new 
CGMDataCollectorSchema(),
kafkaConnectProperties));

Thanks

Reply via email to