Hi guys, I just wanted to give a quick update on what happened with this - after looking into it some more, it appears the extra latency was entirely caused by the publisher of the messages rather than the Beam job/PubsubIO. After tweaking the publishers to use smaller batches and adhere to a 'maximum latency', the end-to-end latency (from publishing through to Bigtable) now seems to be well under 1s, which is what I was hoping for!
Thanks again for the advice on this :) Josh On Wed, May 24, 2017 at 9:31 PM, <[email protected]> wrote: > Hi Raghu, > > Thanks for the suggestions and for having a look at the metrics! > I will try the reshuffle and tweaking the publisher batching over the next > couple of days, along with Ankur's suggestions, and will report back if any > of these have a significant difference. > > Best regards, > Josh > > On 24 May 2017, at 21:17, Raghu Angadi <[email protected]> wrote: > > Josh, > > Reshuffle might also be worth trying. To clarify, ~1s end-to-end is not > always simple given number of systems and services involved between > publisher and eventual sink. > > Raghu. > > On Wed, May 24, 2017 at 12:32 PM, Raghu Angadi <[email protected]> wrote: > >> Thanks. Looked a some job system level metrics. I see minimal latency >> within Dataflow pipeline itself, you might not see much improvement from >> Reshuffle() (may be ~0.25 seconds). >> >> Do you have control on publisher? Publishers might be batching hundreds >> of messages, which adds latency. You can try to reduce that. Even then >> PubSub itself does some batching internally which might limit overall >> latency. Removing publisher batching is worth a try. >> >> >> On Wed, May 24, 2017 at 11:46 AM, Josh <[email protected]> wrote: >> >>> Hi Raghu, >>> >>> My job ID is 2017-05-24_02_46_42-11524480684503077480 - thanks for >>> taking a look! >>> >>> Yes I'm using BigtableIO for the sink and I am measuring the end-to-end >>> latency. It seems to take 3-6 seconds typically, I would like to get it >>> down to ~1s. >>> >>> Thanks, >>> Josh >>> >>> On Wed, May 24, 2017 at 6:50 PM, Raghu Angadi <[email protected]> >>> wrote: >>> >>>> Josh, >>>> >>>> Can you share your job_id? I could take look. Are you measuring latency >>>> end-to-end (publisher to when it appears on BT?). Are you using BigtableIO >>>> for sink? >>>> >>>> There is no easy way to use more workers when auto-scaling is enabled. >>>> It thinks your backlog and CPU are low enough and does not need to scale. >>>> Raghu. >>>> >>>> On Wed, May 24, 2017 at 10:14 AM, Josh <[email protected]> wrote: >>>> >>>>> Thanks Ankur, that's super helpful! I will give these optimisations a >>>>> go. >>>>> >>>>> About the "No operations completed" message - there are a few of these >>>>> in the logs (but very few, like 1 an hour or something) - so probably no >>>>> need to scale up Bigtable. >>>>> I did however see a lot of INFO messages "Wrote 0 records" in the >>>>> logs. Probably about 50% of the "Wrote n records" messages are zero. >>>>> While the other 50% are quite high (e.g. "Wrote 80 records"). Not sure if >>>>> that could indicate a bad setting? >>>>> >>>>> Josh >>>>> >>>>> >>>>> >>>>> On Wed, May 24, 2017 at 5:22 PM, Ankur Chauhan <[email protected]> >>>>> wrote: >>>>> >>>>>> There are two main things to see here: >>>>>> >>>>>> * In the logs, are there any messages like "No operations completed >>>>>> within the last 61 seconds. There are still 1 simple operations and 1 >>>>>> complex operations in progress.” This means you are underscaled on the >>>>>> bigtable side and would benefit from increasing the node count. >>>>>> * We also saw some improvement in performance (workload dependent) by >>>>>> going to a bigger worker machine type. >>>>>> * Another optimization that worked for our use case: >>>>>> >>>>>> // streaming dataflow has larger machines with smaller bundles, so we >>>>>> can queue up a lot more without blowing up >>>>>> private static BigtableOptions >>>>>> createStreamingBTOptions(AnalyticsPipelineOptions opts) { >>>>>> return new BigtableOptions.Builder() >>>>>> .setProjectId(opts.getProject()) >>>>>> .setInstanceId(opts.getBigtableInstanceId()) >>>>>> .setUseCachedDataPool(true) >>>>>> .setDataChannelCount(32) >>>>>> .setBulkOptions(new BulkOptions.Builder() >>>>>> .setUseBulkApi(true) >>>>>> .setBulkMaxRowKeyCount(2048) >>>>>> .setBulkMaxRequestSize(8_388_608L) >>>>>> .setAsyncMutatorWorkerCount(32) >>>>>> .build()) >>>>>> .build(); >>>>>> } >>>>>> >>>>>> >>>>>> There is a lot of trial and error involved in getting the end-to-end >>>>>> latency down so I would suggest enabling the profiling using the >>>>>> —saveProfilesToGcs option and get a sense of what is exactly happening. >>>>>> >>>>>> — Ankur Chauhan >>>>>> >>>>>> On May 24, 2017, at 9:09 AM, Josh <[email protected]> wrote: >>>>>> >>>>>> Ah ok - I am using the Dataflow runner. I didn't realise about the >>>>>> custom implementation being provided at runtime... >>>>>> >>>>>> Any ideas of how to tweak my job to either lower the latency >>>>>> consuming from PubSub or to lower the latency in writing to Bigtable? >>>>>> >>>>>> >>>>>> On Wed, May 24, 2017 at 4:14 PM, Lukasz Cwik <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> What runner are you using (Flink, Spark, Google Cloud Dataflow, >>>>>>> Apex, ...)? >>>>>>> >>>>>>> On Wed, May 24, 2017 at 8:09 AM, Ankur Chauhan <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Sorry that was an autocorrect error. I meant to ask - what dataflow >>>>>>>> runner are you using? If you are using google cloud dataflow then the >>>>>>>> PubsubIO class is not the one doing the reading from the pubsub topic. >>>>>>>> They >>>>>>>> provide a custom implementation at run time. >>>>>>>> >>>>>>>> Ankur Chauhan >>>>>>>> Sent from my iPhone >>>>>>>> >>>>>>>> On May 24, 2017, at 07:52, Josh <[email protected]> wrote: >>>>>>>> >>>>>>>> Hi Ankur, >>>>>>>> >>>>>>>> What do you mean by runner address? >>>>>>>> Would you be able to link me to the comment you're referring to? >>>>>>>> >>>>>>>> I am using the PubsubIO.Read class from Beam 2.0.0 as found here: >>>>>>>> https://github.com/apache/beam/blob/release-2.0.0/sdks/java/ >>>>>>>> io/google-cloud-platform/src/main/java/org/apache/beam/sdk/i >>>>>>>> o/gcp/pubsub/PubsubIO.java >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Josh >>>>>>>> >>>>>>>> On Wed, May 24, 2017 at 3:36 PM, Ankur Chauhan <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> What runner address you using. Google cloud dataflow uses a closed >>>>>>>>> source version of the pubsub reader as noted in a comment on Read >>>>>>>>> class. >>>>>>>>> >>>>>>>>> Ankur Chauhan >>>>>>>>> Sent from my iPhone >>>>>>>>> >>>>>>>>> On May 24, 2017, at 04:05, Josh <[email protected]> wrote: >>>>>>>>> >>>>>>>>> Hi all, >>>>>>>>> >>>>>>>>> I'm using PubsubIO.Read to consume a Pubsub stream, and my job >>>>>>>>> then writes the data out to Bigtable. I'm currently seeing a latency >>>>>>>>> of 3-5 >>>>>>>>> seconds between the messages being published and being written to >>>>>>>>> Bigtable. >>>>>>>>> >>>>>>>>> I want to try and decrease the latency to <1s if possible - does >>>>>>>>> anyone have any tips for doing this? >>>>>>>>> >>>>>>>>> I noticed that there is a PubsubGrpcClient >>>>>>>>> https://github.com/apache/beam/blob/release-2.0.0/sdks/java/ >>>>>>>>> io/google-cloud-platform/src/main/java/org/apache/beam/sdk/i >>>>>>>>> o/gcp/pubsub/PubsubGrpcClient.java however the >>>>>>>>> PubsubUnboundedSource is initialised with a PubsubJsonClient, so the >>>>>>>>> Grpc >>>>>>>>> client doesn't appear to be being used. Is there a way to switch to >>>>>>>>> the >>>>>>>>> Grpc client - as perhaps that would give better performance? >>>>>>>>> >>>>>>>>> Also, I am running my job on Dataflow using autoscaling, which has >>>>>>>>> only allocated one n1-standard-4 instance to the job, which is >>>>>>>>> running at ~50% CPU. Could forcing a higher number of nodes help >>>>>>>>> improve >>>>>>>>> latency? >>>>>>>>> >>>>>>>>> Thanks for any advice, >>>>>>>>> Josh >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >
