Hi Eleanore,
                We are also observing that few task managers are able to keep 
up with incoming load but few task managers are lagging behind after starting 
from savepoint with less parallelism. Not all task managers are affected by 
this problem. We repeated this test multiple times to confirm.

Thanks
Sandeep Kathula

From: "Kathula, Sandeep" <[email protected]>
Reply-To: "[email protected]" <[email protected]>
Date: Monday, August 10, 2020 at 11:04 AM
To: "[email protected]" <[email protected]>, "[email protected]" 
<[email protected]>
Cc: "Vora, Jainik" <[email protected]>, "Benenson, Mikhail" 
<[email protected]>, "Deshpande, Omkar" <[email protected]>, 
"LeVeck, Matt" <[email protected]>
Subject: Re: Beam flink runner job not keeping up with input rate after 
downscaling

This email is from an external sender.

Hi Eleanore,
                    Our DAG:
Source: Strip Metadata/EventBusIO.Read/Read Bytes From 
Kafka/Read(KafkaUnboundedSource) -> Flat Map -> Strip 
Metadata/EventBusIO.Read/MapElements/Map/ParMultiDo(Anonymous) -> Strip 
Metadata/EventBusIO.Read/Decode EB Bytes/ParMultiDo(EbExtractor) -> Strip 
Metadata/MapElements/Map/ParMultiDo(Anonymous) -> Filter Unreadeable 
Messages/ParDo(Anonymous)/ParMultiDo(Anonymous) -> Extract 
Events/Map/ParMultiDo(Anonymous) -> 
UaEnrichEvent/ParMultiDo(UserAgentEnrichment) -> 
IpEnrichEvent/ParMultiDo(GeoEnrichment) -> Keyless 
Write/MapElements/Map/ParMultiDo(Anonymous) -> Keyless 
Write/EventBusIO.Write/ParDo(EbFormatter)/ParMultiDo(EbFormatter) -> Keyless 
Write/EventBusIO.Write/KafkaIO.Write/Kafka 
ProducerRecord/Map/ParMultiDo(Anonymous) -> Keyless 
Write/EventBusIO.Write/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter)


            We read from and write to kafka.

Thanks
Sandeep Kathula


From: Eleanore Jin <[email protected]>
Reply-To: "[email protected]" <[email protected]>
Date: Monday, August 10, 2020 at 10:31 AM
To: "[email protected]" <[email protected]>
Subject: Re: Beam flink runner job not keeping up with input rate after 
downscaling

This email is from an external sender.

Hi Sandeep,

Can you please share your DAG? Is your job read and write to some sink?

Thanks a lot!

On Mon, Aug 10, 2020 at 9:27 AM Kathula, Sandeep 
<[email protected]<mailto:[email protected]>> wrote:
Hi,
   We started a Beam application with Flink runner with parallelism as 50. It 
is a stateless application.  With initial parallelism of 50, our application is 
able to process up to 50,000 records per second. After a week, we took a 
savepoint and restarted from savepoint with the parallelism of 18. We are 
seeing that our application is only able to process 7000 records per second but 
we expect it to process almost 18,000 records per second. Records processed per 
task manager was almost half of what is used to process previously with 50 task 
managers.

 When we started a new application with 18 pods without any savepoint, it is 
able to process ~18500 records per second. This problem occurs only when we 
downscale after taking a savepoint. We ported same application to simple Flink 
application without Apache Beam, and there it scales well without any issues 
after restarting from savepoint with less parallelism.  So the problem should 
be with Apache Beam or some config we are passing to Beam/Flink. We are using 
the following config:

numberOfExecutionRetries=2
externalizedCheckpointsEnabled=true
retainExternalizedCheckpointsOnCancellation=true


We didn’t give any maxParallelism in our Beam application but just specifying 
parallelism.

Beam version - 2.19
Flink version- 1.9

Any suggestions/help would be appreciated.


Thanks
Sandeep Kathula


Reply via email to