There are two main things to see here:
* In the logs, are there any messages like "No operations completed within the
last 61 seconds. There are still 1 simple operations and 1 complex operations
in progress.” This means you are underscaled on the bigtable side and would
benefit from increasing the node count.
* We also saw some improvement in performance (workload dependent) by going to
a bigger worker machine type.
* Another optimization that worked for our use case:
// streaming dataflow has larger machines with smaller bundles, so we can queue
up a lot more without blowing up
private static BigtableOptions
createStreamingBTOptions(AnalyticsPipelineOptions opts) {
return new BigtableOptions.Builder()
.setProjectId(opts.getProject())
.setInstanceId(opts.getBigtableInstanceId())
.setUseCachedDataPool(true)
.setDataChannelCount(32)
.setBulkOptions(new BulkOptions.Builder()
.setUseBulkApi(true)
.setBulkMaxRowKeyCount(2048)
.setBulkMaxRequestSize(8_388_608L)
.setAsyncMutatorWorkerCount(32)
.build())
.build();
}
There is a lot of trial and error involved in getting the end-to-end latency
down so I would suggest enabling the profiling using the —saveProfilesToGcs
option and get a sense of what is exactly happening.
— Ankur Chauhan
> On May 24, 2017, at 9:09 AM, Josh <[email protected]> wrote:
>
> Ah ok - I am using the Dataflow runner. I didn't realise about the custom
> implementation being provided at runtime...
>
> Any ideas of how to tweak my job to either lower the latency consuming from
> PubSub or to lower the latency in writing to Bigtable?
>
>
> On Wed, May 24, 2017 at 4:14 PM, Lukasz Cwik <[email protected]
> <mailto:[email protected]>> wrote:
> What runner are you using (Flink, Spark, Google Cloud Dataflow, Apex, ...)?
>
> On Wed, May 24, 2017 at 8:09 AM, Ankur Chauhan <[email protected]
> <mailto:[email protected]>> wrote:
> Sorry that was an autocorrect error. I meant to ask - what dataflow runner
> are you using? If you are using google cloud dataflow then the PubsubIO class
> is not the one doing the reading from the pubsub topic. They provide a custom
> implementation at run time.
>
> Ankur Chauhan
> Sent from my iPhone
>
> On May 24, 2017, at 07:52, Josh <[email protected] <mailto:[email protected]>>
> wrote:
>
>> Hi Ankur,
>>
>> What do you mean by runner address?
>> Would you be able to link me to the comment you're referring to?
>>
>> I am using the PubsubIO.Read class from Beam 2.0.0 as found here:
>> https://github.com/apache/beam/blob/release-2.0.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
>>
>> <https://github.com/apache/beam/blob/release-2.0.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java>
>>
>> Thanks,
>> Josh
>>
>> On Wed, May 24, 2017 at 3:36 PM, Ankur Chauhan <[email protected]
>> <mailto:[email protected]>> wrote:
>> What runner address you using. Google cloud dataflow uses a closed source
>> version of the pubsub reader as noted in a comment on Read class.
>>
>> Ankur Chauhan
>> Sent from my iPhone
>>
>> On May 24, 2017, at 04:05, Josh <[email protected] <mailto:[email protected]>>
>> wrote:
>>
>>> Hi all,
>>>
>>> I'm using PubsubIO.Read to consume a Pubsub stream, and my job then writes
>>> the data out to Bigtable. I'm currently seeing a latency of 3-5 seconds
>>> between the messages being published and being written to Bigtable.
>>>
>>> I want to try and decrease the latency to <1s if possible - does anyone
>>> have any tips for doing this?
>>>
>>> I noticed that there is a PubsubGrpcClient
>>> https://github.com/apache/beam/blob/release-2.0.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java
>>>
>>> <https://github.com/apache/beam/blob/release-2.0.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java>
>>> however the PubsubUnboundedSource is initialised with a PubsubJsonClient,
>>> so the Grpc client doesn't appear to be being used. Is there a way to
>>> switch to the Grpc client - as perhaps that would give better performance?
>>>
>>> Also, I am running my job on Dataflow using autoscaling, which has only
>>> allocated one n1-standard-4 instance to the job, which is running at ~50%
>>> CPU. Could forcing a higher number of nodes help improve latency?
>>>
>>> Thanks for any advice,
>>> Josh
>>
>
>