Hi Arvid,

The throughput has decreased further after I removed all the rebalance(). The 
performance has decreased from 14 minutes for 20K messages to 20 minutes for 
20K messages.

Below are the tasks that the flink application is performing. I am using keyBy 
and Window operation. Do you think am I making any mistake here or the way I am 
performing the keyBy or Window operation needs to be corrected?.

//Add Source
StreamExecutionEnvironment streamenv = 
StreamExecutionEnvironment.getExecutionEnvironment();
initialStreamData = streamenv.addSource(new 
FlinkKafkaConsumer<>(topicsProperties.getProperty(Common.CGM_STREAM_TOPIC),
new ObjectNodeJsonDeSerializerSchema(), 
kafkaConnectProperties)).setParallelism(Common.FORTY_FIVE);

DataStream<CGM> cgmStreamData = initialStreamData.keyBy(value -> 
value.findValue("PERSON_ID").asText())
.flatMap(new SgStreamingTask()).setParallelism(Common.FORTY_FIVE);

DataStream<CGM> artfctOverlapStream = cgmStreamData.keyBy(new 
CGMKeySelector()).countWindow(2, 1)
.apply(new 
ArtifactOverlapProvider()).setParallelism(Common.FORTY_FIVE).rebalance();

DataStream<CGM> streamWithSgRoc = artfctOverlapStream.keyBy(new 
CGMKeySelector()).countWindow(7, 1)
.apply(new SgRocProvider()).setParallelism(Common.FORTY_FIVE).rebalance();

DataStream<CGMDataCollector> cgmExcursionStream = streamWithSgRoc.keyBy(new 
CGMKeySelector())
.countWindow(Common.THREE, Common.ONE).apply(new 
CGMExcursionProviderStream()).setParallelism(Common.FORTY_FIVE).rebalance();

//Add Sink
cgmExcursionStream.addSink(new FlinkKafkaProducer<CGMDataCollector>(
topicsProperties.getProperty(Common.CGM_EVENT_TOPIC), new 
CGMDataCollectorSchema(),
kafkaConnectProperties)).setParallelism(Common.FORTY_FIVE);

Implementation classes:-

//deserialize the json message received
ObjectNodeJsonDeSerializerSchema implements 
KeyedDeserializationSchema<ObjectNode>{
public ObjectNode deserialize(byte[] messageKey, byte[] message, String topic, 
int partition, long offset);
}

//Flapmap to check each message and apply validation
public class SgStreamingTask extends RichFlatMapFunction<ObjectNode, CGM> {
void flatMap(ObjectNode streamData, Collector<CGM> out);
}

//persist three state variables and apply business logic
public class ArtifactOverlapProvider extends RichFlatMapFunction<CGM, 
Tuple2<Long, Long>>
implements WindowFunction<CGM, CGM, String, GlobalWindow> {
public void apply(String key, GlobalWindow window, Iterable<CGM> values, 
Collector<CGM> out);
}

//Apply business logic
public class SgRocProvider implements WindowFunction<CGM, CGM, String, 
GlobalWindow>{
public void apply(String key, GlobalWindow window, Iterable<CGM> values, 
Collector<CGM> out);
}

//persist 3 state variables and apply business logic
public class CGMExcursionProviderStream extends RichFlatMapFunction<CGM, 
Tuple2<Long, Long>>
implements WindowFunction<CGM, CGMDataCollector, String, GlobalWindow>{
public void apply(String key, GlobalWindow window, Iterable<CGM> values, 
Collector<CGMDataCollector> out);

}

Thanks
Kamaal


> On Mon, Sep 6, 2021 at 9:57 PM Arvid Heise <ar...@apache.org> wrote:
> Hi Mohammed,
> 
> something is definitely wrong in your setup. You can safely say that you can 
> process 1k records per second and core with Kafka and light processing, so 
> you shouldn't even need to go distributed in your case.
> 
> Do you perform any heavy computation? What is your flatMap doing? Are you 
> emitting lots of small records from one big record?
> 
> Can you please remove all rebalance and report back? Rebalance is 
> counter-productive if you don't exactly know that you need it.
> 
>> On Thu, Sep 2, 2021 at 1:36 PM Mohammed Kamaal <mohammed.kamaa...@gmail.com> 
>> wrote:
>> Hi Fabian,
>> 
>> Just an update,
>> 
>> Problem 2:-
>> ----------------
>> Caused by: org.apache.kafka.common.errors.NetworkException
>> It is resolved. It was because we exceeded the number of allowed
>> partitions for the kafka cluster (AWS MSK cluster). Have deleted
>> unused topics and partitions to resolve the issue.
>> 
>> Problem 1:-
>> ----------------
>> I increased the kafka partition and flink parallelism to 45 and the
>> throughput has improved from 20 minutes to 14 minutes (20K records).
>> Can you check the flink graph and let me know if there is anything
>> else that can be done here to improve the throughput further.
>> 
>> Thanks
>> 
>> On Wed, Sep 1, 2021 at 10:55 PM Mohammed Kamaal
>> <mohammed.kamaa...@gmail.com> wrote:
>> >
>> > Hi Fabian,
>> >
>> > Problem 1:-
>> > ---------------------
>> > I have removed the print out sink's and ran the test again. This time
>> > the throughput is 17 minutes for 20K records (200 records every
>> > second). Earlier it was 20 minutes for 20K records. (parallelism 15
>> > and kafka partition of 15)
>> >
>> > Please find the attached application graph. Can you suggest what else
>> > is required further to improve the throughput.
>> >
>> > Problem 2:-
>> > ---------------------
>> > Also, I tried to increase the parallelism to 45 from 15 (also
>> > increasing the kafka partition to 45 from 15) to see if this helps in
>> > getting a better throughput.
>> >
>> > After increasing the partition, I am facing the Network issue with
>> > Kafka Cluster (AWS Managed Stream Kafka). I am not getting this issue
>> > with 15 partitions for the kafka topic. This could be an issue with
>> > the Kafka cluster?
>> >
>> > Kafka Cluster Configuration:-
>> > ---------------------------------------
>> > auto.create.topics.enable=true
>> > log.retention.hours=24
>> > default.replication.factor=3
>> > min.insync.replicas=2
>> > num.io.threads=45
>> > num.network.threads=60
>> > num.partitions=45
>> > num.replica.fetchers=2
>> > unclean.leader.election.enable=true
>> > replica.lag.time.max.ms=30000
>> > zookeeper.session.timeout.ms=18000
>> > log.retention.ms=172800000
>> > log.cleanup.policy=delete
>> > group.max.session.timeout.ms=1200000
>> >
>> > Exception:-
>> > ----------------
>> >  "locationInformation":
>> > "org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:500)",
>> >     "logger": "org.apache.flink.streaming.runtime.tasks.StreamTask",
>> >     "message": "Error during disposal of stream operator.",
>> >     "throwableInformation": [
>> >         "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException:
>> > Failed to send data to Kafka: Failed to send data to Kafka: The server
>> > disconnected
>> >
>> > "Caused by: org.apache.kafka.common.errors.NetworkException: The
>> > server disconnected before a response was received."
>> >
>> >
>> > Thanks
>> >
>> >
>> > On Wed, Aug 25, 2021 at 12:11 PM Fabian Paul <fabianp...@ververica.com> 
>> > wrote:
>> > >
>> > > Hi Mohammed,
>> > >
>> > > 200records should definitely be doable. The first you can do is remove 
>> > > the print out Sink because they are increasing the load on your cluster 
>> > > due to the additional IO
>> > > operation and secondly preventing Flink from fusing operators.
>> > > I am interested to see the updated job graph after the removal of the 
>> > > print sinks.
>> > >
>> > > Best,
>> > > Fabian

Reply via email to