Hi Eleanore,
Our DAG:
Source: Strip Metadata/EventBusIO.Read/Read Bytes From
Kafka/Read(KafkaUnboundedSource) -> Flat Map -> Strip
Metadata/EventBusIO.Read/MapElements/Map/ParMultiDo(Anonymous) -> Strip
Metadata/EventBusIO.Read/Decode EB Bytes/ParMultiDo(EbExtractor) -> Strip
Metadata/MapElements/Map/ParMultiDo(Anonymous) -> Filter Unreadeable
Messages/ParDo(Anonymous)/ParMultiDo(Anonymous) -> Extract
Events/Map/ParMultiDo(Anonymous) ->
UaEnrichEvent/ParMultiDo(UserAgentEnrichment) ->
IpEnrichEvent/ParMultiDo(GeoEnrichment) -> Keyless
Write/MapElements/Map/ParMultiDo(Anonymous) -> Keyless
Write/EventBusIO.Write/ParDo(EbFormatter)/ParMultiDo(EbFormatter) -> Keyless
Write/EventBusIO.Write/KafkaIO.Write/Kafka
ProducerRecord/Map/ParMultiDo(Anonymous) -> Keyless
Write/EventBusIO.Write/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter)
We read from and write to kafka.
Thanks
Sandeep Kathula
From: Eleanore Jin <[email protected]>
Reply-To: "[email protected]" <[email protected]>
Date: Monday, August 10, 2020 at 10:31 AM
To: "[email protected]" <[email protected]>
Subject: Re: Beam flink runner job not keeping up with input rate after
downscaling
This email is from an external sender.
Hi Sandeep,
Can you please share your DAG? Is your job read and write to some sink?
Thanks a lot!
On Mon, Aug 10, 2020 at 9:27 AM Kathula, Sandeep
<[email protected]<mailto:[email protected]>> wrote:
Hi,
We started a Beam application with Flink runner with parallelism as 50. It
is a stateless application. With initial parallelism of 50, our application is
able to process up to 50,000 records per second. After a week, we took a
savepoint and restarted from savepoint with the parallelism of 18. We are
seeing that our application is only able to process 7000 records per second but
we expect it to process almost 18,000 records per second. Records processed per
task manager was almost half of what is used to process previously with 50 task
managers.
When we started a new application with 18 pods without any savepoint, it is
able to process ~18500 records per second. This problem occurs only when we
downscale after taking a savepoint. We ported same application to simple Flink
application without Apache Beam, and there it scales well without any issues
after restarting from savepoint with less parallelism. So the problem should
be with Apache Beam or some config we are passing to Beam/Flink. We are using
the following config:
numberOfExecutionRetries=2
externalizedCheckpointsEnabled=true
retainExternalizedCheckpointsOnCancellation=true
We didn’t give any maxParallelism in our Beam application but just specifying
parallelism.
Beam version - 2.19
Flink version- 1.9
Any suggestions/help would be appreciated.
Thanks
Sandeep Kathula