Hi Vincent, Did you mean <=3000 or did you want that to be <=30000?
Cheers Reza On Fri, 8 May 2020 at 04:23, Vincent Domingues < [email protected]> wrote: > Hi all, > > We are trying to work with Beam on Flink runner to consume PubSub messages. > > We are facing latency issue even with very low PubSub throughput. > > For example if you try the following simple beam pipeline consuming a > PubSub subscription : > > > ----------------------------------------------------------------------------------- > > package org.apache.beam.examples; > > import org.apache.beam.sdk.Pipeline; > import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; > import org.apache.beam.sdk.metrics.Counter; > import org.apache.beam.sdk.metrics.Distribution; > import org.apache.beam.sdk.metrics.Metrics; > import org.apache.beam.sdk.options.Description; > import org.apache.beam.sdk.options.PipelineOptions; > import org.apache.beam.sdk.options.PipelineOptionsFactory; > import org.apache.beam.sdk.transforms.DoFn; > import org.apache.beam.sdk.transforms.ParDo; > import org.joda.time.Instant; > > import java.util.logging.Logger; > > public class PubSubBeam { > > private final static Logger LOGGER = > Logger.getLogger(PubSubBeam.class.getName()); > > public interface MyOptions extends PipelineOptions { > @Description("Topic to use in PubSub") > String getInputSubscription(); > void setInputSubscription(String value); > } > > static class LogMessages extends DoFn<String, String> { > private final Distribution distribution = > Metrics.distribution(this.getClass().getName(), "latency-distribution"); > private final Counter counter_30 = > Metrics.counter(this.getClass().getName(), "0s_30s"); > private final Counter counter_60 = > Metrics.counter(this.getClass().getName(), "30s_60s"); > private final Counter counter_90 = > Metrics.counter(this.getClass().getName(), "60s_90s"); > private final Counter counter_120 = > Metrics.counter(this.getClass().getName(), "90s_120s"); > private final Counter counter_240 = > Metrics.counter(this.getClass().getName(), "120s_240s"); > private final Counter counter_inf = > Metrics.counter(this.getClass().getName(), "240s_infs"); > private final Counter total = > Metrics.counter(this.getClass().getName(), "total"); > > @ProcessElement > public void processElement(ProcessContext c) { > Long latency = Instant.now().getMillis() - > c.timestamp().getMillis(); > if (latency <= 3000){ > counter_30.inc(); > } > else if (latency <= 60000){ > counter_60.inc(); > } > else if (latency <= 90000){ > counter_90.inc(); > } > else if (latency <= 120000){ > counter_120.inc(); > } > else if (latency <= 240000){ > counter_240.inc(); > } > else if (latency > 240000){ > counter_inf.inc(); > } > total.inc(); > distribution.update(latency); > } > } > > public static void main(String[] args) { > MyOptions options = > PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class); > Pipeline p = Pipeline.create(options); > > p.apply("Read PubSub Messages", > PubsubIO.readStrings().fromSubscription(options.getInputSubscription())) > .apply(ParDo.of(new LogMessages())); > > p.run(); > } > } > > > ----------------------------------------------------------------------------------- > > If you recover pipeline metrics you'll get this latency distribution: > > 0s_30s: 31.81% > 30s_60s: 64.89% > 60s_90s: 2.73% > 90s_120s: 0.44% > 120s_240s: 0.13% > 240s_infs: 0.01% > total: 100.00% > > > With almost no operations on our pipeline 64% of our messages need between > 30 to 60 seconds to get acknowledged. > > Someone already faced this situation or it is a known issue ? > I am interested on any clue to deal with this latency issue. > > Thanks for your help > Stay safe > > Regards, > Vincent >
