Hi all,

We are trying to work with Beam on Flink runner to consume PubSub messages.

We are facing latency issue even with very low PubSub throughput.

For example if you try the following simple beam pipeline consuming a PubSub 
subscription :

-----------------------------------------------------------------------------------

package org.apache.beam.examples;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.joda.time.Instant;

import java.util.logging.Logger;

public class PubSubBeam {

    private final static Logger LOGGER = 
Logger.getLogger(PubSubBeam.class.getName());

    public interface MyOptions extends PipelineOptions {
        @Description("Topic to use in PubSub")
        String getInputSubscription();
        void setInputSubscription(String value);
    }

    static class LogMessages extends DoFn<String, String> {
        private final Distribution distribution = 
Metrics.distribution(this.getClass().getName(), "latency-distribution");
        private final Counter counter_30 = 
Metrics.counter(this.getClass().getName(), "0s_30s");
        private final Counter counter_60 = 
Metrics.counter(this.getClass().getName(), "30s_60s");
        private final Counter counter_90 = 
Metrics.counter(this.getClass().getName(), "60s_90s");
        private final Counter counter_120 = 
Metrics.counter(this.getClass().getName(), "90s_120s");
        private final Counter counter_240 = 
Metrics.counter(this.getClass().getName(), "120s_240s");
        private final Counter counter_inf = 
Metrics.counter(this.getClass().getName(), "240s_infs");
        private final Counter total = 
Metrics.counter(this.getClass().getName(), "total");

        @ProcessElement
        public void processElement(ProcessContext c) {
            Long latency = Instant.now().getMillis() - 
c.timestamp().getMillis();
            if (latency <= 3000){
                counter_30.inc();
            }
            else if (latency <= 60000){
                counter_60.inc();
            }
            else if (latency <= 90000){
                counter_90.inc();
            }
            else if (latency <= 120000){
                counter_120.inc();
            }
            else if (latency <= 240000){
                counter_240.inc();
            }
            else if (latency > 240000){
                counter_inf.inc();
            }
            total.inc();
            distribution.update(latency);
        }
    }

    public static void main(String[] args) {
        MyOptions options = 
PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
        Pipeline p = Pipeline.create(options);

        p.apply("Read PubSub Messages", 
PubsubIO.readStrings().fromSubscription(options.getInputSubscription()))
         .apply(ParDo.of(new LogMessages()));

        p.run();
    }
}

-----------------------------------------------------------------------------------

If you recover pipeline metrics you'll get this latency distribution:

0s_30s: 31.81%
30s_60s: 64.89%
60s_90s: 2.73%
90s_120s: 0.44%
120s_240s: 0.13%
240s_infs: 0.01%
total: 100.00%


With almost no operations on our pipeline 64% of our messages need between 30 
to 60 seconds to get acknowledged.

Someone already faced this situation or it is a known issue ?
I am interested on any clue to deal with this latency issue.

Thanks for your help
Stay safe

Regards,
Vincent

Reply via email to