Hi Sandeep, Thanks a lot for sharing! On a separate note, I see you are using the KafkaIO.write, but not with EOS (exactly once semantics). From my understanding, just enabling a checkpoint will not be enough to guarantee no message loss? I pasted part of my DAG with KakfaIO EOS enabled. I am also read and write to Kafka with KafkaIO.
[image: image.png] Thanks a lot! Eleanore On Mon, Aug 10, 2020 at 11:07 AM Kathula, Sandeep < [email protected]> wrote: > Hi Eleanore, > > We are also observing that few task managers are able to > keep up with incoming load but few task managers are lagging behind after > starting from savepoint with less parallelism. Not all task managers are > affected by this problem. We repeated this test multiple times to confirm. > > > > Thanks > > Sandeep Kathula > > > > *From: *"Kathula, Sandeep" <[email protected]> > *Reply-To: *"[email protected]" <[email protected]> > *Date: *Monday, August 10, 2020 at 11:04 AM > *To: *"[email protected]" <[email protected]>, " > [email protected]" <[email protected]> > *Cc: *"Vora, Jainik" <[email protected]>, "Benenson, Mikhail" < > [email protected]>, "Deshpande, Omkar" < > [email protected]>, "LeVeck, Matt" <[email protected]> > *Subject: *Re: Beam flink runner job not keeping up with input rate after > downscaling > > > > This email is from an external sender. > > > > Hi Eleanore, > > Our DAG: > > Source: Strip Metadata/EventBusIO.Read/Read Bytes From > Kafka/Read(KafkaUnboundedSource) -> Flat Map -> Strip > Metadata/EventBusIO.Read/MapElements/Map/ParMultiDo(Anonymous) -> Strip > Metadata/EventBusIO.Read/Decode EB Bytes/ParMultiDo(EbExtractor) -> Strip > Metadata/MapElements/Map/ParMultiDo(Anonymous) -> Filter Unreadeable > Messages/ParDo(Anonymous)/ParMultiDo(Anonymous) -> Extract > Events/Map/ParMultiDo(Anonymous) -> > UaEnrichEvent/ParMultiDo(UserAgentEnrichment) -> > IpEnrichEvent/ParMultiDo(GeoEnrichment) -> Keyless > Write/MapElements/Map/ParMultiDo(Anonymous) -> Keyless > Write/EventBusIO.Write/ParDo(EbFormatter)/ParMultiDo(EbFormatter) -> > Keyless Write/EventBusIO.Write/KafkaIO.Write/Kafka > ProducerRecord/Map/ParMultiDo(Anonymous) -> Keyless > Write/EventBusIO.Write/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter) > > > > > > We read from and write to *kafka*. > > > > Thanks > > Sandeep Kathula > > > > > > *From: *Eleanore Jin <[email protected]> > *Reply-To: *"[email protected]" <[email protected]> > *Date: *Monday, August 10, 2020 at 10:31 AM > *To: *"[email protected]" <[email protected]> > *Subject: *Re: Beam flink runner job not keeping up with input rate after > downscaling > > > > This email is from an external sender. > > > > Hi Sandeep, > > > > Can you please share your DAG? Is your job read and write to some sink? > > > > Thanks a lot! > > > > On Mon, Aug 10, 2020 at 9:27 AM Kathula, Sandeep < > [email protected]> wrote: > > Hi, > > We started a Beam application with Flink runner with parallelism as 50. > It is a *stateless application.* With initial parallelism of 50, our > application is able to process up to *50,000 records* per second. After a > week, we took a savepoint and restarted from savepoint with the parallelism > of *18.* We are seeing that our application is only able to process *7000* > records > per second but we expect it to process almost 18,000 records per second. > Records processed per task manager was almost *half* of what is used to > process previously with 50 task managers. > > > > When we started a new application with 18 pods without any savepoint, it > is able to process ~18500 records per second. This problem *occurs only > when we downscale after taking a savepoint*. We ported same application > to simple *Flink application without Apache Beam*, and there *it scales > well without any issues* after restarting from savepoint with less > parallelism. So the problem should be with Apache Beam or some config we > are passing to Beam/Flink. We are using the following config: > > > > numberOfExecutionRetries=2 > > externalizedCheckpointsEnabled=true > > retainExternalizedCheckpointsOnCancellation=true > > > > > > We didn’t give any maxParallelism in our Beam application but just > specifying parallelism. > > > > Beam version - 2.19 > > Flink version- 1.9 > > > > Any suggestions/help would be appreciated. > > > > > > Thanks > > Sandeep Kathula > > > > > >
