Hi Kamaal,

I would first suggest understanding the performance bottleneck, before
applying any optimizations.

Idea 1: Are your CPUs fully utilized?
if yes, good, then scaling up will probably help
If not, then there's another inefficiency

Idea 2: How fast can you get the data into your job, without any processing?
You can measure this by submitting a simple Flink job that just reads the
data and writes it to a discarding sink. Either disable the operator
chaining to get metrics for the records per second, or add a custom mapper
in between that measures the throughput.
Ideally you see here that you can read all your data in a few seconds, if
not, then there's a problem getting your data in.

Idea 3: is your IO fully utilized ?( if you are checkpointing to RocksDB,
the disk can dramatically slow you down)
Idea 4: Are you under high memory pressure, and your JVMs are spending most
of their cycles garbage collecting?

My bet is you are not getting data into your cluster as fast as you think
(Idea 2)


On Wed, Sep 22, 2021 at 12:05 PM Mohammed Kamaal <
mohammed.kamaa...@gmail.com> wrote:

> Hi Arvid,
>
> The throughput has decreased further after I removed all the rebalance().
> The performance has decreased from 14 minutes for 20K messages to 20
> minutes for 20K messages.
>
> Below are the tasks that the flink application is performing. I am using
> keyBy and Window operation. Do you think am I making any mistake here or
> the way I am performing the keyBy or Window operation needs to be
> corrected?.
>
> //Add Source
> StreamExecutionEnvironment streamenv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> initialStreamData = streamenv.addSource(new
> FlinkKafkaConsumer<>(topicsProperties.getProperty(Common.CGM_STREAM_TOPIC),
> new *ObjectNodeJsonDeSerializerSchema()*,
> kafkaConnectProperties)).setParallelism(Common.FORTY_FIVE);
>
> DataStream<CGM> cgmStreamData = initialStreamData.keyBy(value ->
> value.findValue("PERSON_ID").asText())
> .flatMap(new *SgStreamingTask()*).setParallelism(Common.FORTY_FIVE);
>
> DataStream<CGM> artfctOverlapStream = cgmStreamData.keyBy(new
> CGMKeySelector()).countWindow(2, 1)
> .apply(new *ArtifactOverlapProvider()*
> ).setParallelism(Common.FORTY_FIVE).rebalance();
>
> DataStream<CGM> streamWithSgRoc = artfctOverlapStream.keyBy(new
> CGMKeySelector()).countWindow(7, 1)
> .apply(new *SgRocProvider()*
> ).setParallelism(Common.FORTY_FIVE).rebalance();
>
> DataStream<CGMDataCollector> cgmExcursionStream =
> streamWithSgRoc.keyBy(new CGMKeySelector())
> .countWindow(Common.THREE, Common.ONE).apply(new
> *CGMExcursionProviderStream()*
> ).setParallelism(Common.FORTY_FIVE).rebalance();
>
> //Add Sink
> cgmExcursionStream.addSink(new FlinkKafkaProducer<CGMDataCollector>(
> topicsProperties.getProperty(Common.CGM_EVENT_TOPIC), new
> CGMDataCollectorSchema(),
> kafkaConnectProperties)).setParallelism(Common.FORTY_FIVE);
>
> *Implementation classes:-*
>
> //deserialize the json message received
> *ObjectNodeJsonDeSerializerSchema* implements
> KeyedDeserializationSchema<ObjectNode>{
> public ObjectNode deserialize(byte[] messageKey, byte[] message, String
> topic, int partition, long offset);
> }
>
> //Flapmap to check each message and apply validation
> public class *SgStreamingTask* extends RichFlatMapFunction<ObjectNode,
> CGM> {
> void flatMap(ObjectNode streamData, Collector<CGM> out);
> }
>
> //persist three state variables and apply business logic
> public class *ArtifactOverlapProvider* extends RichFlatMapFunction<CGM,
> Tuple2<Long, Long>>
> implements WindowFunction<CGM, CGM, String, GlobalWindow> {
> public void apply(String key, GlobalWindow window, Iterable<CGM> values,
> Collector<CGM> out);
> }
>
> //Apply business logic
> public class *SgRocProvider* implements WindowFunction<CGM, CGM, String,
> GlobalWindow>{
> public void apply(String key, GlobalWindow window, Iterable<CGM> values,
> Collector<CGM> out);
> }
>
> //persist 3 state variables and apply business logic
> public class *CGMExcursionProviderStream* extends
> RichFlatMapFunction<CGM, Tuple2<Long, Long>>
> implements WindowFunction<CGM, CGMDataCollector, String, GlobalWindow>{
> public void apply(String key, GlobalWindow window, Iterable<CGM> values,
> Collector<CGMDataCollector> out);
>
> }
>
> Thanks
> Kamaal
>
>
> On Mon, Sep 6, 2021 at 9:57 PM Arvid Heise <ar...@apache.org> wrote:
>
>> Hi Mohammed,
>>
>> something is definitely wrong in your setup. You can safely say that you
>> can process 1k records per second and core with Kafka and light processing,
>> so you shouldn't even need to go distributed in your case.
>>
>> Do you perform any heavy computation? What is your flatMap doing? Are you
>> emitting lots of small records from one big record?
>>
>> Can you please remove all rebalance and report back? Rebalance is
>> counter-productive if you don't exactly know that you need it.
>>
>> On Thu, Sep 2, 2021 at 1:36 PM Mohammed Kamaal <
>> mohammed.kamaa...@gmail.com> wrote:
>>
>>> Hi Fabian,
>>>
>>> Just an update,
>>>
>>> Problem 2:-
>>> ----------------
>>> Caused by: org.apache.kafka.common.errors.NetworkException
>>> It is resolved. It was because we exceeded the number of allowed
>>> partitions for the kafka cluster (AWS MSK cluster). Have deleted
>>> unused topics and partitions to resolve the issue.
>>>
>>> Problem 1:-
>>> ----------------
>>> I increased the kafka partition and flink parallelism to 45 and the
>>> throughput has improved from 20 minutes to 14 minutes (20K records).
>>> Can you check the flink graph and let me know if there is anything
>>> else that can be done here to improve the throughput further.
>>>
>>> Thanks
>>>
>>> On Wed, Sep 1, 2021 at 10:55 PM Mohammed Kamaal
>>> <mohammed.kamaa...@gmail.com> wrote:
>>> >
>>> > Hi Fabian,
>>> >
>>> > Problem 1:-
>>> > ---------------------
>>> > I have removed the print out sink's and ran the test again. This time
>>> > the throughput is 17 minutes for 20K records (200 records every
>>> > second). Earlier it was 20 minutes for 20K records. (parallelism 15
>>> > and kafka partition of 15)
>>> >
>>> > Please find the attached application graph. Can you suggest what else
>>> > is required further to improve the throughput.
>>> >
>>> > Problem 2:-
>>> > ---------------------
>>> > Also, I tried to increase the parallelism to 45 from 15 (also
>>> > increasing the kafka partition to 45 from 15) to see if this helps in
>>> > getting a better throughput.
>>> >
>>> > After increasing the partition, I am facing the Network issue with
>>> > Kafka Cluster (AWS Managed Stream Kafka). I am not getting this issue
>>> > with 15 partitions for the kafka topic. This could be an issue with
>>> > the Kafka cluster?
>>> >
>>> > Kafka Cluster Configuration:-
>>> > ---------------------------------------
>>> > auto.create.topics.enable=true
>>> > log.retention.hours=24
>>> > default.replication.factor=3
>>> > min.insync.replicas=2
>>> > num.io.threads=45
>>> > num.network.threads=60
>>> > num.partitions=45
>>> > num.replica.fetchers=2
>>> > unclean.leader.election.enable=true
>>> > replica.lag.time.max.ms=30000
>>> > zookeeper.session.timeout.ms=18000
>>> > log.retention.ms=172800000
>>> > log.cleanup.policy=delete
>>> > group.max.session.timeout.ms=1200000
>>> >
>>> > Exception:-
>>> > ----------------
>>> >  "locationInformation":
>>> >
>>> "org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:500)",
>>> >     "logger": "org.apache.flink.streaming.runtime.tasks.StreamTask",
>>> >     "message": "Error during disposal of stream operator.",
>>> >     "throwableInformation": [
>>> >
>>>  "org.apache.flink.streaming.connectors.kafka.FlinkKafkaException:
>>> > Failed to send data to Kafka: Failed to send data to Kafka: The server
>>> > disconnected
>>> >
>>> > "Caused by: org.apache.kafka.common.errors.NetworkException: The
>>> > server disconnected before a response was received."
>>> >
>>> >
>>> > Thanks
>>> >
>>> >
>>> > On Wed, Aug 25, 2021 at 12:11 PM Fabian Paul <fabianp...@ververica.com>
>>> wrote:
>>> > >
>>> > > Hi Mohammed,
>>> > >
>>> > > 200records should definitely be doable. The first you can do is
>>> remove the print out Sink because they are increasing the load on your
>>> cluster due to the additional IO
>>> > > operation and secondly preventing Flink from fusing operators.
>>> > > I am interested to see the updated job graph after the removal of
>>> the print sinks.
>>> > >
>>> > > Best,
>>> > > Fabian
>>>
>>

Reply via email to