Hi Vincent,

Did you mean <=3000 or did you want that to be <=30000?

Cheers
Reza

On Fri, 8 May 2020 at 04:23, Vincent Domingues <
[email protected]> wrote:

> Hi all,
>
> We are trying to work with Beam on Flink runner to consume PubSub messages.
>
> We are facing latency issue even with very low PubSub throughput.
>
> For example if you try the following simple beam pipeline consuming a
> PubSub subscription :
>
>
> -----------------------------------------------------------------------------------
>
> package org.apache.beam.examples;
>
> import org.apache.beam.sdk.Pipeline;
> import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
> import org.apache.beam.sdk.metrics.Counter;
> import org.apache.beam.sdk.metrics.Distribution;
> import org.apache.beam.sdk.metrics.Metrics;
> import org.apache.beam.sdk.options.Description;
> import org.apache.beam.sdk.options.PipelineOptions;
> import org.apache.beam.sdk.options.PipelineOptionsFactory;
> import org.apache.beam.sdk.transforms.DoFn;
> import org.apache.beam.sdk.transforms.ParDo;
> import org.joda.time.Instant;
>
> import java.util.logging.Logger;
>
> public class PubSubBeam {
>
>     private final static Logger LOGGER =
> Logger.getLogger(PubSubBeam.class.getName());
>
>     public interface MyOptions extends PipelineOptions {
>         @Description("Topic to use in PubSub")
>         String getInputSubscription();
>         void setInputSubscription(String value);
>     }
>
>     static class LogMessages extends DoFn<String, String> {
>         private final Distribution distribution =
> Metrics.distribution(this.getClass().getName(), "latency-distribution");
>         private final Counter counter_30 =
> Metrics.counter(this.getClass().getName(), "0s_30s");
>         private final Counter counter_60 =
> Metrics.counter(this.getClass().getName(), "30s_60s");
>         private final Counter counter_90 =
> Metrics.counter(this.getClass().getName(), "60s_90s");
>         private final Counter counter_120 =
> Metrics.counter(this.getClass().getName(), "90s_120s");
>         private final Counter counter_240 =
> Metrics.counter(this.getClass().getName(), "120s_240s");
>         private final Counter counter_inf =
> Metrics.counter(this.getClass().getName(), "240s_infs");
>         private final Counter total =
> Metrics.counter(this.getClass().getName(), "total");
>
>         @ProcessElement
>         public void processElement(ProcessContext c) {
>             Long latency = Instant.now().getMillis() -
> c.timestamp().getMillis();
>             if (latency <= 3000){
>                 counter_30.inc();
>             }
>             else if (latency <= 60000){
>                 counter_60.inc();
>             }
>             else if (latency <= 90000){
>                 counter_90.inc();
>             }
>             else if (latency <= 120000){
>                 counter_120.inc();
>             }
>             else if (latency <= 240000){
>                 counter_240.inc();
>             }
>             else if (latency > 240000){
>                 counter_inf.inc();
>             }
>             total.inc();
>             distribution.update(latency);
>         }
>     }
>
>     public static void main(String[] args) {
>         MyOptions options =
> PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
>         Pipeline p = Pipeline.create(options);
>
>         p.apply("Read PubSub Messages",
> PubsubIO.readStrings().fromSubscription(options.getInputSubscription()))
>          .apply(ParDo.of(new LogMessages()));
>
>         p.run();
>     }
> }
>
>
> -----------------------------------------------------------------------------------
>
> If you recover pipeline metrics you'll get this latency distribution:
>
> 0s_30s: 31.81%
> 30s_60s: 64.89%
> 60s_90s: 2.73%
> 90s_120s: 0.44%
> 120s_240s: 0.13%
> 240s_infs: 0.01%
> total: 100.00%
>
>
> With almost no operations on our pipeline 64% of our messages need between
> 30 to 60 seconds to get acknowledged.
>
> Someone already faced this situation or it is a known issue ?
> I am interested on any clue to deal with this latency issue.
>
> Thanks for your help
> Stay safe
>
> Regards,
> Vincent
>

Reply via email to