Hi Sandeep,

Thanks a lot for sharing! On a separate note, I see you are using the
KafkaIO.write, but not with EOS (exactly once semantics). From my
understanding, just enabling a checkpoint will not be enough to guarantee
no message loss? I pasted part of my DAG with KakfaIO EOS enabled. I am
also read and write to Kafka with KafkaIO.

[image: image.png]
Thanks a lot!
Eleanore

On Mon, Aug 10, 2020 at 11:07 AM Kathula, Sandeep <
[email protected]> wrote:

> Hi Eleanore,
>
>                 We are also observing that few task managers are able to
> keep up with incoming load but few task managers are lagging behind after
> starting from savepoint with less parallelism. Not all task managers are
> affected by this problem. We repeated this test multiple times to confirm.
>
>
>
> Thanks
>
> Sandeep Kathula
>
>
>
> *From: *"Kathula, Sandeep" <[email protected]>
> *Reply-To: *"[email protected]" <[email protected]>
> *Date: *Monday, August 10, 2020 at 11:04 AM
> *To: *"[email protected]" <[email protected]>, "
> [email protected]" <[email protected]>
> *Cc: *"Vora, Jainik" <[email protected]>, "Benenson, Mikhail" <
> [email protected]>, "Deshpande, Omkar" <
> [email protected]>, "LeVeck, Matt" <[email protected]>
> *Subject: *Re: Beam flink runner job not keeping up with input rate after
> downscaling
>
>
>
> This email is from an external sender.
>
>
>
> Hi Eleanore,
>
>                     Our DAG:
>
> Source: Strip Metadata/EventBusIO.Read/Read Bytes From
> Kafka/Read(KafkaUnboundedSource) -> Flat Map -> Strip
> Metadata/EventBusIO.Read/MapElements/Map/ParMultiDo(Anonymous) -> Strip
> Metadata/EventBusIO.Read/Decode EB Bytes/ParMultiDo(EbExtractor) -> Strip
> Metadata/MapElements/Map/ParMultiDo(Anonymous) -> Filter Unreadeable
> Messages/ParDo(Anonymous)/ParMultiDo(Anonymous) -> Extract
> Events/Map/ParMultiDo(Anonymous) ->
> UaEnrichEvent/ParMultiDo(UserAgentEnrichment) ->
> IpEnrichEvent/ParMultiDo(GeoEnrichment) -> Keyless
> Write/MapElements/Map/ParMultiDo(Anonymous) -> Keyless
> Write/EventBusIO.Write/ParDo(EbFormatter)/ParMultiDo(EbFormatter) ->
> Keyless Write/EventBusIO.Write/KafkaIO.Write/Kafka
> ProducerRecord/Map/ParMultiDo(Anonymous) -> Keyless
> Write/EventBusIO.Write/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter)
>
>
>
>
>
>             We read from and write to *kafka*.
>
>
>
> Thanks
>
> Sandeep Kathula
>
>
>
>
>
> *From: *Eleanore Jin <[email protected]>
> *Reply-To: *"[email protected]" <[email protected]>
> *Date: *Monday, August 10, 2020 at 10:31 AM
> *To: *"[email protected]" <[email protected]>
> *Subject: *Re: Beam flink runner job not keeping up with input rate after
> downscaling
>
>
>
> This email is from an external sender.
>
>
>
> Hi Sandeep,
>
>
>
> Can you please share your DAG? Is your job read and write to some sink?
>
>
>
> Thanks a lot!
>
>
>
> On Mon, Aug 10, 2020 at 9:27 AM Kathula, Sandeep <
> [email protected]> wrote:
>
> Hi,
>
>    We started a Beam application with Flink runner with parallelism as 50.
> It is a *stateless application.*  With initial parallelism of 50, our
> application is able to process up to *50,000 records* per second. After a
> week, we took a savepoint and restarted from savepoint with the parallelism
> of *18.* We are seeing that our application is only able to process *7000* 
> records
> per second but we expect it to process almost 18,000 records per second.
> Records processed per task manager was almost *half* of what is used to
> process previously with 50 task managers.
>
>
>
>  When we started a new application with 18 pods without any savepoint, it
> is able to process ~18500 records per second. This problem *occurs only
> when we downscale after taking a savepoint*. We ported same application
> to simple *Flink application without Apache Beam*, and there *it scales
> well without any issues* after restarting from savepoint with less
> parallelism.  So the problem should be with Apache Beam or some config we
> are passing to Beam/Flink. We are using the following config:
>
>
>
> numberOfExecutionRetries=2
>
> externalizedCheckpointsEnabled=true
>
> retainExternalizedCheckpointsOnCancellation=true
>
>
>
>
>
> We didn’t give any maxParallelism in our Beam application but just
> specifying parallelism.
>
>
>
> Beam version - 2.19
>
> Flink version- 1.9
>
>
>
> Any suggestions/help would be appreciated.
>
>
>
>
>
> Thanks
>
> Sandeep Kathula
>
>
>
>
>
>

Reply via email to