Re: Flink Performance Issue

2021-09-27 Thread Arvid Heise
Hi Kamaal,

I did a quick test with a local Kafka in docker. With parallelism 1, I can
process 20k messages of size 4KB in about 1 min. So if you use parallelism
of 15, I'd expect it to take it below 10s even with bigger data skew.

What I recommend you to do is to start from scratch and just work with a
simple source -> sink. That should be much much faster. If so, then you can
add complexity until you find the bottleneck.

If not, I suspect your ObjectNodeJsonDeSerializerSchema to be the issue.
For example, are you creating an ObjectMapper with each invocation? That's
a typical mistake.

Best,

Arvid

On Mon, Sep 27, 2021 at 2:38 PM Mohammed Kamaal 
wrote:

> Hi Robert,
>
> I have removed all the business logic (keyBy and window) operator code and
> just had a source and sink to test it.
> The throughput is 20K messages in 2 minutes. It is a simple read from
> source (kafka topic) and write to sink (kafka topic). Don't you think 2
> minutes is also not a better throughput for a simple read/write
> application?. Each message is 4 KB.
>
> As I had mentioned in the previous email(s), I am using keyBy() and
> Window() to handle business logic. Do you think these operators would have
> a huge impact on the performance?. Or is it something to do with my Kafka
> cluster configuration or the older version of flink (1.8) that I am using
> in my application. Not sure if flink version 1.8 has a performance issue.
>
> Please let me know.
> Below is my kafka cluster configuration.
>
> auto.create.topics.enable=true
> log.retention.hours=24
> default.replication.factor=3
> min.insync.replicas=2
> num.io.threads=45
> num.network.threads=60
> num.partitions=45
> num.replica.fetchers=2
> unclean.leader.election.enable=true
> replica.lag.time.max.ms=3
> zookeeper.session.timeout.ms=18000
> log.retention.ms=17280
> log.cleanup.policy=delete
> group.max.session.timeout.ms=120
>
>
>
> Thanks
>
> On Wed, Sep 22, 2021 at 9:06 PM Robert Metzger 
> wrote:
>
>> Hi Kamaal,
>>
>> I would first suggest understanding the performance bottleneck, before
>> applying any optimizations.
>>
>> Idea 1: Are your CPUs fully utilized?
>> if yes, good, then scaling up will probably help
>> If not, then there's another inefficiency
>>
>> Idea 2: How fast can you get the data into your job, without any
>> processing?
>> You can measure this by submitting a simple Flink job that just reads the
>> data and writes it to a discarding sink. Either disable the operator
>> chaining to get metrics for the records per second, or add a custom mapper
>> in between that measures the throughput.
>> Ideally you see here that you can read all your data in a few seconds, if
>> not, then there's a problem getting your data in.
>>
>> Idea 3: is your IO fully utilized ?( if you are checkpointing to RocksDB,
>> the disk can dramatically slow you down)
>> Idea 4: Are you under high memory pressure, and your JVMs are spending
>> most of their cycles garbage collecting?
>>
>> My bet is you are not getting data into your cluster as fast as you think
>> (Idea 2)
>>
>>
>> On Wed, Sep 22, 2021 at 12:05 PM Mohammed Kamaal <
>> mohammed.kamaa...@gmail.com> wrote:
>>
>>> Hi Arvid,
>>>
>>> The throughput has decreased further after I removed all the
>>> rebalance(). The performance has decreased from 14 minutes for 20K messages
>>> to 20 minutes for 20K messages.
>>>
>>> Below are the tasks that the flink application is performing. I am using
>>> keyBy and Window operation. Do you think am I making any mistake here or
>>> the way I am performing the keyBy or Window operation needs to be
>>> corrected?.
>>>
>>> //Add Source
>>> StreamExecutionEnvironment streamenv =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> initialStreamData = streamenv.addSource(new
>>> FlinkKafkaConsumer<>(topicsProperties.getProperty(Common.CGM_STREAM_TOPIC),
>>> new *ObjectNodeJsonDeSerializerSchema()*,
>>> kafkaConnectProperties)).setParallelism(Common.FORTY_FIVE);
>>>
>>> DataStream cgmStreamData = initialStreamData.keyBy(value ->
>>> value.findValue("PERSON_ID").asText())
>>> .flatMap(new *SgStreamingTask()*).setParallelism(Common.FORTY_FIVE);
>>>
>>> DataStream artfctOverlapStream = cgmStreamData.keyBy(new
>>> CGMKeySelector()).countWindow(2, 1)
>>> .apply(new *ArtifactOverlapProvider()*
>>> ).setParallelism(Common.FORTY_FIVE).rebalance();
>>>
>>> DataStream streamWithSgRoc = artfctOverlapStream.keyBy(new
>>> CGMKeySelector()).countWindow(7, 1)
>>> .apply(new *SgRocProvider()*
>>> ).setParallelism(Common.FORTY_FIVE).rebalance();
>>>
>>> DataStream cgmExcursionStream =
>>> streamWithSgRoc.keyBy(new CGMKeySelector())
>>> .countWindow(Common.THREE, Common.ONE).apply(new
>>> *CGMExcursionProviderStream()*
>>> ).setParallelism(Common.FORTY_FIVE).rebalance();
>>>
>>> //Add Sink
>>> cgmExcursionStream.addSink(new FlinkKafkaProducer(
>>> topicsProperties.getProperty(Common.CGM_EVENT_TOPIC), new
>>> CGMDataCollectorSchema(),
>>> 

Re: Flink Performance Issue

2021-09-27 Thread Mohammed Kamaal
Hi Robert,

I have removed all the business logic (keyBy and window) operator code and just 
had a source and sink to test it.
The throughput is 20K messages in 2 minutes. It is a simple read from source 
(kafka topic) and write to sink (kafka topic). Don't you think 2 minutes is 
also not a better throughput for a simple read/write application?. Each message 
is 4 KB.

As I had mentioned in the previous email(s), I am using keyBy() and Window() to 
handle business logic. Do you think these operators would have a huge impact on 
the performance?. Or is it something to do with my Kafka cluster configuration 
or the older version of flink (1.8) that I am using in my application. Not sure 
if flink version 1.8 has a performance issue.

Please let me know.
Below is my kafka cluster configuration.

auto.create.topics.enable=true
log.retention.hours=24
default.replication.factor=3
min.insync.replicas=2
num.io.threads=45
num.network.threads=60
num.partitions=45
num.replica.fetchers=2
unclean.leader.election.enable=true
replica.lag.time.max.ms=3
zookeeper.session.timeout.ms=18000
log.retention.ms=17280
log.cleanup.policy=delete
group.max.session.timeout.ms=120



Thanks

> On Wed, Sep 22, 2021 at 9:06 PM Robert Metzger  wrote:
> Hi Kamaal,
> 
> I would first suggest understanding the performance bottleneck, before 
> applying any optimizations.
> 
> Idea 1: Are your CPUs fully utilized?
> if yes, good, then scaling up will probably help
> If not, then there's another inefficiency
> 
> Idea 2: How fast can you get the data into your job, without any processing?
> You can measure this by submitting a simple Flink job that just reads the 
> data and writes it to a discarding sink. Either disable the operator chaining 
> to get metrics for the records per second, or add a custom mapper in between 
> that measures the throughput.
> Ideally you see here that you can read all your data in a few seconds, if 
> not, then there's a problem getting your data in.
> 
> Idea 3: is your IO fully utilized ?( if you are checkpointing to RocksDB, the 
> disk can dramatically slow you down)
> Idea 4: Are you under high memory pressure, and your JVMs are spending most 
> of their cycles garbage collecting?
> 
> My bet is you are not getting data into your cluster as fast as you think 
> (Idea 2)
> 
> 
>> On Wed, Sep 22, 2021 at 12:05 PM Mohammed Kamaal 
>>  wrote:
>> Hi Arvid,
>> 
>> The throughput has decreased further after I removed all the rebalance(). 
>> The performance has decreased from 14 minutes for 20K messages to 20 minutes 
>> for 20K messages.
>> 
>> Below are the tasks that the flink application is performing. I am using 
>> keyBy and Window operation. Do you think am I making any mistake here or the 
>> way I am performing the keyBy or Window operation needs to be corrected?.
>> 
>> //Add Source
>> StreamExecutionEnvironment streamenv = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> initialStreamData = streamenv.addSource(new 
>> FlinkKafkaConsumer<>(topicsProperties.getProperty(Common.CGM_STREAM_TOPIC),
>> new ObjectNodeJsonDeSerializerSchema(), 
>> kafkaConnectProperties)).setParallelism(Common.FORTY_FIVE);
>> 
>> DataStream cgmStreamData = initialStreamData.keyBy(value -> 
>> value.findValue("PERSON_ID").asText())
>> .flatMap(new SgStreamingTask()).setParallelism(Common.FORTY_FIVE);
>> 
>> DataStream artfctOverlapStream = cgmStreamData.keyBy(new 
>> CGMKeySelector()).countWindow(2, 1)
>> .apply(new 
>> ArtifactOverlapProvider()).setParallelism(Common.FORTY_FIVE).rebalance();
>> 
>> DataStream streamWithSgRoc = artfctOverlapStream.keyBy(new 
>> CGMKeySelector()).countWindow(7, 1)
>> .apply(new SgRocProvider()).setParallelism(Common.FORTY_FIVE).rebalance();
>> 
>> DataStream cgmExcursionStream = streamWithSgRoc.keyBy(new 
>> CGMKeySelector())
>> .countWindow(Common.THREE, Common.ONE).apply(new 
>> CGMExcursionProviderStream()).setParallelism(Common.FORTY_FIVE).rebalance();
>> 
>> //Add Sink
>> cgmExcursionStream.addSink(new FlinkKafkaProducer(
>> topicsProperties.getProperty(Common.CGM_EVENT_TOPIC), new 
>> CGMDataCollectorSchema(),
>> kafkaConnectProperties)).setParallelism(Common.FORTY_FIVE);
>> 
>> Implementation classes:-
>> 
>> //deserialize the json message received
>> ObjectNodeJsonDeSerializerSchema implements 
>> KeyedDeserializationSchema{
>> public ObjectNode deserialize(byte[] messageKey, byte[] message, String 
>> topic, int partition, long offset);
>> }
>> 
>> //Flapmap to check each message and apply validation
>> public class SgStreamingTask extends RichFlatMapFunction {
>> void flatMap(ObjectNode streamData, Collector out);
>> }
>> 
>> //persist three state variables and apply business logic
>> public class ArtifactOverlapProvider extends RichFlatMapFunction> Tuple2>
>> implements WindowFunction {
>> public void apply(String key, GlobalWindow window, Iterable values, 
>> Collector out);
>> }
>> 
>> //Apply business logic
>> public class SgRocProvider implements 

Re: Flink Performance Issue

2021-09-22 Thread Robert Metzger
Hi Kamaal,

I would first suggest understanding the performance bottleneck, before
applying any optimizations.

Idea 1: Are your CPUs fully utilized?
if yes, good, then scaling up will probably help
If not, then there's another inefficiency

Idea 2: How fast can you get the data into your job, without any processing?
You can measure this by submitting a simple Flink job that just reads the
data and writes it to a discarding sink. Either disable the operator
chaining to get metrics for the records per second, or add a custom mapper
in between that measures the throughput.
Ideally you see here that you can read all your data in a few seconds, if
not, then there's a problem getting your data in.

Idea 3: is your IO fully utilized ?( if you are checkpointing to RocksDB,
the disk can dramatically slow you down)
Idea 4: Are you under high memory pressure, and your JVMs are spending most
of their cycles garbage collecting?

My bet is you are not getting data into your cluster as fast as you think
(Idea 2)


On Wed, Sep 22, 2021 at 12:05 PM Mohammed Kamaal <
mohammed.kamaa...@gmail.com> wrote:

> Hi Arvid,
>
> The throughput has decreased further after I removed all the rebalance().
> The performance has decreased from 14 minutes for 20K messages to 20
> minutes for 20K messages.
>
> Below are the tasks that the flink application is performing. I am using
> keyBy and Window operation. Do you think am I making any mistake here or
> the way I am performing the keyBy or Window operation needs to be
> corrected?.
>
> //Add Source
> StreamExecutionEnvironment streamenv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> initialStreamData = streamenv.addSource(new
> FlinkKafkaConsumer<>(topicsProperties.getProperty(Common.CGM_STREAM_TOPIC),
> new *ObjectNodeJsonDeSerializerSchema()*,
> kafkaConnectProperties)).setParallelism(Common.FORTY_FIVE);
>
> DataStream cgmStreamData = initialStreamData.keyBy(value ->
> value.findValue("PERSON_ID").asText())
> .flatMap(new *SgStreamingTask()*).setParallelism(Common.FORTY_FIVE);
>
> DataStream artfctOverlapStream = cgmStreamData.keyBy(new
> CGMKeySelector()).countWindow(2, 1)
> .apply(new *ArtifactOverlapProvider()*
> ).setParallelism(Common.FORTY_FIVE).rebalance();
>
> DataStream streamWithSgRoc = artfctOverlapStream.keyBy(new
> CGMKeySelector()).countWindow(7, 1)
> .apply(new *SgRocProvider()*
> ).setParallelism(Common.FORTY_FIVE).rebalance();
>
> DataStream cgmExcursionStream =
> streamWithSgRoc.keyBy(new CGMKeySelector())
> .countWindow(Common.THREE, Common.ONE).apply(new
> *CGMExcursionProviderStream()*
> ).setParallelism(Common.FORTY_FIVE).rebalance();
>
> //Add Sink
> cgmExcursionStream.addSink(new FlinkKafkaProducer(
> topicsProperties.getProperty(Common.CGM_EVENT_TOPIC), new
> CGMDataCollectorSchema(),
> kafkaConnectProperties)).setParallelism(Common.FORTY_FIVE);
>
> *Implementation classes:-*
>
> //deserialize the json message received
> *ObjectNodeJsonDeSerializerSchema* implements
> KeyedDeserializationSchema{
> public ObjectNode deserialize(byte[] messageKey, byte[] message, String
> topic, int partition, long offset);
> }
>
> //Flapmap to check each message and apply validation
> public class *SgStreamingTask* extends RichFlatMapFunction CGM> {
> void flatMap(ObjectNode streamData, Collector out);
> }
>
> //persist three state variables and apply business logic
> public class *ArtifactOverlapProvider* extends RichFlatMapFunction Tuple2>
> implements WindowFunction {
> public void apply(String key, GlobalWindow window, Iterable values,
> Collector out);
> }
>
> //Apply business logic
> public class *SgRocProvider* implements WindowFunction GlobalWindow>{
> public void apply(String key, GlobalWindow window, Iterable values,
> Collector out);
> }
>
> //persist 3 state variables and apply business logic
> public class *CGMExcursionProviderStream* extends
> RichFlatMapFunction>
> implements WindowFunction{
> public void apply(String key, GlobalWindow window, Iterable values,
> Collector out);
>
> }
>
> Thanks
> Kamaal
>
>
> On Mon, Sep 6, 2021 at 9:57 PM Arvid Heise  wrote:
>
>> Hi Mohammed,
>>
>> something is definitely wrong in your setup. You can safely say that you
>> can process 1k records per second and core with Kafka and light processing,
>> so you shouldn't even need to go distributed in your case.
>>
>> Do you perform any heavy computation? What is your flatMap doing? Are you
>> emitting lots of small records from one big record?
>>
>> Can you please remove all rebalance and report back? Rebalance is
>> counter-productive if you don't exactly know that you need it.
>>
>> On Thu, Sep 2, 2021 at 1:36 PM Mohammed Kamaal <
>> mohammed.kamaa...@gmail.com> wrote:
>>
>>> Hi Fabian,
>>>
>>> Just an update,
>>>
>>> Problem 2:-
>>> 
>>> Caused by: org.apache.kafka.common.errors.NetworkException
>>> It is resolved. It was because we exceeded the number of allowed
>>> partitions for the kafka cluster (AWS MSK cluster). Have deleted
>>> unused topics 

Re: Flink Performance Issue

2021-09-22 Thread Mohammed Kamaal
Hi Arvid,

The throughput has decreased further after I removed all the rebalance(). The 
performance has decreased from 14 minutes for 20K messages to 20 minutes for 
20K messages.

Below are the tasks that the flink application is performing. I am using keyBy 
and Window operation. Do you think am I making any mistake here or the way I am 
performing the keyBy or Window operation needs to be corrected?.

//Add Source
StreamExecutionEnvironment streamenv = 
StreamExecutionEnvironment.getExecutionEnvironment();
initialStreamData = streamenv.addSource(new 
FlinkKafkaConsumer<>(topicsProperties.getProperty(Common.CGM_STREAM_TOPIC),
new ObjectNodeJsonDeSerializerSchema(), 
kafkaConnectProperties)).setParallelism(Common.FORTY_FIVE);

DataStream cgmStreamData = initialStreamData.keyBy(value -> 
value.findValue("PERSON_ID").asText())
.flatMap(new SgStreamingTask()).setParallelism(Common.FORTY_FIVE);

DataStream artfctOverlapStream = cgmStreamData.keyBy(new 
CGMKeySelector()).countWindow(2, 1)
.apply(new 
ArtifactOverlapProvider()).setParallelism(Common.FORTY_FIVE).rebalance();

DataStream streamWithSgRoc = artfctOverlapStream.keyBy(new 
CGMKeySelector()).countWindow(7, 1)
.apply(new SgRocProvider()).setParallelism(Common.FORTY_FIVE).rebalance();

DataStream cgmExcursionStream = streamWithSgRoc.keyBy(new 
CGMKeySelector())
.countWindow(Common.THREE, Common.ONE).apply(new 
CGMExcursionProviderStream()).setParallelism(Common.FORTY_FIVE).rebalance();

//Add Sink
cgmExcursionStream.addSink(new FlinkKafkaProducer(
topicsProperties.getProperty(Common.CGM_EVENT_TOPIC), new 
CGMDataCollectorSchema(),
kafkaConnectProperties)).setParallelism(Common.FORTY_FIVE);

Implementation classes:-

//deserialize the json message received
ObjectNodeJsonDeSerializerSchema implements 
KeyedDeserializationSchema{
public ObjectNode deserialize(byte[] messageKey, byte[] message, String topic, 
int partition, long offset);
}

//Flapmap to check each message and apply validation
public class SgStreamingTask extends RichFlatMapFunction {
void flatMap(ObjectNode streamData, Collector out);
}

//persist three state variables and apply business logic
public class ArtifactOverlapProvider extends RichFlatMapFunction>
implements WindowFunction {
public void apply(String key, GlobalWindow window, Iterable values, 
Collector out);
}

//Apply business logic
public class SgRocProvider implements WindowFunction{
public void apply(String key, GlobalWindow window, Iterable values, 
Collector out);
}

//persist 3 state variables and apply business logic
public class CGMExcursionProviderStream extends RichFlatMapFunction>
implements WindowFunction{
public void apply(String key, GlobalWindow window, Iterable values, 
Collector out);

}

Thanks
Kamaal


> On Mon, Sep 6, 2021 at 9:57 PM Arvid Heise  wrote:
> Hi Mohammed,
> 
> something is definitely wrong in your setup. You can safely say that you can 
> process 1k records per second and core with Kafka and light processing, so 
> you shouldn't even need to go distributed in your case.
> 
> Do you perform any heavy computation? What is your flatMap doing? Are you 
> emitting lots of small records from one big record?
> 
> Can you please remove all rebalance and report back? Rebalance is 
> counter-productive if you don't exactly know that you need it.
> 
>> On Thu, Sep 2, 2021 at 1:36 PM Mohammed Kamaal  
>> wrote:
>> Hi Fabian,
>> 
>> Just an update,
>> 
>> Problem 2:-
>> 
>> Caused by: org.apache.kafka.common.errors.NetworkException
>> It is resolved. It was because we exceeded the number of allowed
>> partitions for the kafka cluster (AWS MSK cluster). Have deleted
>> unused topics and partitions to resolve the issue.
>> 
>> Problem 1:-
>> 
>> I increased the kafka partition and flink parallelism to 45 and the
>> throughput has improved from 20 minutes to 14 minutes (20K records).
>> Can you check the flink graph and let me know if there is anything
>> else that can be done here to improve the throughput further.
>> 
>> Thanks
>> 
>> On Wed, Sep 1, 2021 at 10:55 PM Mohammed Kamaal
>>  wrote:
>> >
>> > Hi Fabian,
>> >
>> > Problem 1:-
>> > -
>> > I have removed the print out sink's and ran the test again. This time
>> > the throughput is 17 minutes for 20K records (200 records every
>> > second). Earlier it was 20 minutes for 20K records. (parallelism 15
>> > and kafka partition of 15)
>> >
>> > Please find the attached application graph. Can you suggest what else
>> > is required further to improve the throughput.
>> >
>> > Problem 2:-
>> > -
>> > Also, I tried to increase the parallelism to 45 from 15 (also
>> > increasing the kafka partition to 45 from 15) to see if this helps in
>> > getting a better throughput.
>> >
>> > After increasing the partition, I am facing the Network issue with
>> > Kafka Cluster (AWS Managed Stream Kafka). I am not getting this issue
>> > with 15 partitions for the kafka topic. This could be an issue with

Re: Flink Performance Issue

2021-09-06 Thread Arvid Heise
Hi Mohammed,

something is definitely wrong in your setup. You can safely say that you
can process 1k records per second and core with Kafka and light processing,
so you shouldn't even need to go distributed in your case.

Do you perform any heavy computation? What is your flatMap doing? Are you
emitting lots of small records from one big record?

Can you please remove all rebalance and report back? Rebalance is
counter-productive if you don't exactly know that you need it.

On Thu, Sep 2, 2021 at 1:36 PM Mohammed Kamaal 
wrote:

> Hi Fabian,
>
> Just an update,
>
> Problem 2:-
> 
> Caused by: org.apache.kafka.common.errors.NetworkException
> It is resolved. It was because we exceeded the number of allowed
> partitions for the kafka cluster (AWS MSK cluster). Have deleted
> unused topics and partitions to resolve the issue.
>
> Problem 1:-
> 
> I increased the kafka partition and flink parallelism to 45 and the
> throughput has improved from 20 minutes to 14 minutes (20K records).
> Can you check the flink graph and let me know if there is anything
> else that can be done here to improve the throughput further.
>
> Thanks
>
> On Wed, Sep 1, 2021 at 10:55 PM Mohammed Kamaal
>  wrote:
> >
> > Hi Fabian,
> >
> > Problem 1:-
> > -
> > I have removed the print out sink's and ran the test again. This time
> > the throughput is 17 minutes for 20K records (200 records every
> > second). Earlier it was 20 minutes for 20K records. (parallelism 15
> > and kafka partition of 15)
> >
> > Please find the attached application graph. Can you suggest what else
> > is required further to improve the throughput.
> >
> > Problem 2:-
> > -
> > Also, I tried to increase the parallelism to 45 from 15 (also
> > increasing the kafka partition to 45 from 15) to see if this helps in
> > getting a better throughput.
> >
> > After increasing the partition, I am facing the Network issue with
> > Kafka Cluster (AWS Managed Stream Kafka). I am not getting this issue
> > with 15 partitions for the kafka topic. This could be an issue with
> > the Kafka cluster?
> >
> > Kafka Cluster Configuration:-
> > ---
> > auto.create.topics.enable=true
> > log.retention.hours=24
> > default.replication.factor=3
> > min.insync.replicas=2
> > num.io.threads=45
> > num.network.threads=60
> > num.partitions=45
> > num.replica.fetchers=2
> > unclean.leader.election.enable=true
> > replica.lag.time.max.ms=3
> > zookeeper.session.timeout.ms=18000
> > log.retention.ms=17280
> > log.cleanup.policy=delete
> > group.max.session.timeout.ms=120
> >
> > Exception:-
> > 
> >  "locationInformation":
> >
> "org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:500)",
> > "logger": "org.apache.flink.streaming.runtime.tasks.StreamTask",
> > "message": "Error during disposal of stream operator.",
> > "throwableInformation": [
> > "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException:
> > Failed to send data to Kafka: Failed to send data to Kafka: The server
> > disconnected
> >
> > "Caused by: org.apache.kafka.common.errors.NetworkException: The
> > server disconnected before a response was received."
> >
> >
> > Thanks
> >
> >
> > On Wed, Aug 25, 2021 at 12:11 PM Fabian Paul 
> wrote:
> > >
> > > Hi Mohammed,
> > >
> > > 200records should definitely be doable. The first you can do is remove
> the print out Sink because they are increasing the load on your cluster due
> to the additional IO
> > > operation and secondly preventing Flink from fusing operators.
> > > I am interested to see the updated job graph after the removal of the
> print sinks.
> > >
> > > Best,
> > > Fabian
>


Re: Flink Performance Issue

2021-09-02 Thread Mohammed Kamaal
Hi Fabian,

Just an update,

Problem 2:-

Caused by: org.apache.kafka.common.errors.NetworkException
It is resolved. It was because we exceeded the number of allowed
partitions for the kafka cluster (AWS MSK cluster). Have deleted
unused topics and partitions to resolve the issue.

Problem 1:-

I increased the kafka partition and flink parallelism to 45 and the
throughput has improved from 20 minutes to 14 minutes (20K records).
Can you check the flink graph and let me know if there is anything
else that can be done here to improve the throughput further.

Thanks

On Wed, Sep 1, 2021 at 10:55 PM Mohammed Kamaal
 wrote:
>
> Hi Fabian,
>
> Problem 1:-
> -
> I have removed the print out sink's and ran the test again. This time
> the throughput is 17 minutes for 20K records (200 records every
> second). Earlier it was 20 minutes for 20K records. (parallelism 15
> and kafka partition of 15)
>
> Please find the attached application graph. Can you suggest what else
> is required further to improve the throughput.
>
> Problem 2:-
> -
> Also, I tried to increase the parallelism to 45 from 15 (also
> increasing the kafka partition to 45 from 15) to see if this helps in
> getting a better throughput.
>
> After increasing the partition, I am facing the Network issue with
> Kafka Cluster (AWS Managed Stream Kafka). I am not getting this issue
> with 15 partitions for the kafka topic. This could be an issue with
> the Kafka cluster?
>
> Kafka Cluster Configuration:-
> ---
> auto.create.topics.enable=true
> log.retention.hours=24
> default.replication.factor=3
> min.insync.replicas=2
> num.io.threads=45
> num.network.threads=60
> num.partitions=45
> num.replica.fetchers=2
> unclean.leader.election.enable=true
> replica.lag.time.max.ms=3
> zookeeper.session.timeout.ms=18000
> log.retention.ms=17280
> log.cleanup.policy=delete
> group.max.session.timeout.ms=120
>
> Exception:-
> 
>  "locationInformation":
> "org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:500)",
> "logger": "org.apache.flink.streaming.runtime.tasks.StreamTask",
> "message": "Error during disposal of stream operator.",
> "throwableInformation": [
> "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException:
> Failed to send data to Kafka: Failed to send data to Kafka: The server
> disconnected
>
> "Caused by: org.apache.kafka.common.errors.NetworkException: The
> server disconnected before a response was received."
>
>
> Thanks
>
>
> On Wed, Aug 25, 2021 at 12:11 PM Fabian Paul  wrote:
> >
> > Hi Mohammed,
> >
> > 200records should definitely be doable. The first you can do is remove the 
> > print out Sink because they are increasing the load on your cluster due to 
> > the additional IO
> > operation and secondly preventing Flink from fusing operators.
> > I am interested to see the updated job graph after the removal of the print 
> > sinks.
> >
> > Best,
> > Fabian


Re: Flink Performance Issue

2021-08-25 Thread Fabian Paul
Hi Mohammed,

200records should definitely be doable. The first you can do is remove the 
print out Sink because they are increasing the load on your cluster due to the 
additional IO 
operation and secondly preventing Flink from fusing operators.
I am interested to see the updated job graph after the removal of the print 
sinks.

Best,
Fabian

Re: Flink Performance Issue

2021-08-24 Thread Fabian Paul
Hi Mohammed,

Without diving too much into your business logic a thing which catches my eye 
is the partitiong you are using. In general all
calls to`keyBy`or `rebalance` are very expensive because all the data is 
shuffled across down- stream tasks. Flink tries to
fuse operators with the same keyGroups together that there is no communication 
overhead between them but this is not 
possible if a shuffle is between them
One example would be your cgmStream which first is distributed by a specified 
key and rebalance right after it. 
When applying `keyBy` operation it is important to understand how the key 
distribution in your input data looks like. It may
happen that specific keys occur very very and some others appear with a less 
likelihood this also can cause a skew in your
pipeline which cannot be resolved with a higher parallelism (some tasks are 
overloaded, some are idle).

I also have a couple of followup questions to better understand your setup

- What do you mean with 20k concurrent stream data, 20k records per second?
- How many taskmanagers are you using and how are the slots distributed?
- Can you check the Flink WebUI if some operators are idle and maybe share the 
image of the job graph?
- How did you notice the lag of 2k between the operators?

Best,
Fabian




Flink Performance Issue

2021-08-24 Thread Mohammed Kamaal
Hi,

Apologize for the big message, to explain the issue in detail.

We have a Flink (version 1.8) application running on AWS Kinesis Analytics. The 
application has a source which is a kafka topic with 15 partitions (AWS Managed 
Streaming Kafka) and the sink is again a kafka topic with 15 partitions.

The size of each stream data is of 4 KB, so which would be 20K * 4 = ~ 79 MB

The application performs some complex business logic with the data and produces 
the output to the kafka topic.

As part of the performance test, the throughput we are getting for 20K (unique 
keys) concurrent stream data is 25 minutes.

Our target is to achieve 20K concurrent stream data in 5 minutes.

I have checked the code and did all the optimizations possible to the business 
logic code, but still don't see any improvement.

Tried increasing the parallelism from 5 to 8 but its the same throughput with 
both 5 and 8 parallelism.

I could also see the stream is distributed between all the 8 slots, though 
there is a lag of 2K between the first operator and the next consecutive 
operators.

Checkpoint is enabled with default (Kinesis analytics) every one minute.

Have also tried having different parallelism for each of the operators.

Can you please suggest any other performance optimizations that need to be 
considered or if I am making any mistake here?.

Here is my sample code
--

StreamExecutionEnvironment streamenv = 
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream initialStreamData = streamenv.addSource(new 
FlinkKafkaConsumer<>(TOPIC_NAME, new ObjectNodeJsonDeSerializerSchema(), 
kafkaConnectProperties);
initialStreamData.print();

DataStream rawDataProcess = initialStreamData.rebalance().flatMap(new 
ReProcessingDataProvider()).keyBy(value -> value.getPersonId());
rawDataProcess.print();

DataStream cgmStream = rawDataProcess.keyBy(new 
ReProcessorKeySelector()).rebalance().flatMap(new SgStreamTask()); //the same 
person_id key
cgmStream.print();

DataStream artfctOverlapStream = null;
artfctOverlapStream = cgmStreamData.keyBy(new CGMKeySelector()).countWindow(2, 
1)
.apply(new ArtifactOverlapProvider()); //the same person_id key
cgmStreamData.print();

DataStream streamWithSgRoc = null;
streamWithSgRoc = artfctOverlapStream.keyBy(new 
CGMKeySelector()).countWindow(7, 1)
.apply(new SgRocProvider()); // the same person_id key
streamWithSgRoc.print();

DataStream cgmExcursionStream = null;
cgmExcursionStream = streamWithSgRoc.keyBy(new CGMKeySelector())
.countWindow(Common.THREE, Common.ONE).apply(new CGMExcursionProviderStream()); 
//the same person_id key
cgmExcursionStream.print();

cgmExcursionStream.addSink(new FlinkKafkaProducer(
topicsProperties.getProperty(Common.CGM_EVENT_TOPIC), new 
CGMDataCollectorSchema(),
kafkaConnectProperties));

Thanks