Hi Lina,

Unfortunately I don't have specific answers to Q1. For Q2 and Q3 we've been in 
a similar situation and detailing our approach might be helpful. We're using 
the Go SDK and are streaming, but it should still be equally applicable.

We have so far ended up with a custom transform that uses the official Bigtable 
client. This way it should be possible to implement any adaptive logic you'd 
like (Q2), however we're using a simpler approach at the moment and it's been 
working well for us.

In our custom transform the DoFn is locally storing up to 100 000 elements per 
bundle. If we hit 100k during processing of the bundle we flush those values in 
a single batch request to Bigtable and otherwise we flush at the end of 
processing the bundle (in DoFn.FinishBundle). The reason for the particular 
number is because of an API limi [1].

For the first more naive approach we're storing the values in-memory, mainly 
because the Go SDK has not supported state so far, and it's working well, but I 
would eventually swap this for storing the values in state along with the 
count. Since you're using the Python SDK this should be possible and I imagine 
fairly easy.

Hope that helps!

[1] 
https://cloud.google.com/bigtable/docs/reference/data/rpc/google.bigtable.v2#google.bigtable.v2.MutateRowsRequest



________________________________________
From: Lina MÃ¥rtensson via user <[email protected]>
Sent: 30 September 2022 01:58
To: [email protected]
Subject: [Question] How to best handle load to Bigtable from Beam (Python)

Hi Beam,

I've happily been developing my Python Beam batch pipeline that writes to 
Bigtable for a while, and the time finally came to catch up on all our existing 
data. It's readings from many smart meters, every 15 minutes, organized in 
files coming in continuously over time - but for now, I want to run over these 
files in batch mode. I currently have ~6T entries, each with 5-10 values 
(floats) that I want to save.

Our Bigtable is using time buckets where all data for a day is bucketed 
together, with 5-10 columns filled out per row, a (packed) float in each.

I've been trying to set up my Beam job so that the writes to Bigtable can be 
efficient. The naive way is to write one cell at a time, which is as expected 
quite inefficient and the writes can be all over the place in the table.

First off, I tried to optimize by grouping my entries and write an entire row 
(96 timestamps in a day * ~5-10 columns) at a time - but that fails as soon as 
I get to the WriteToBigtable stage, with messages like the following on every 
worker, whether I have a big job or run on a smaller data set:
"/usr/local/lib/python3.9/site-packages/apache_beam/io/gcp/bigtableio.py", line 
184, in process self.batcher.mutate(row) File 
"/usr/local/lib/python3.9/site-packages/google/cloud/bigtable/batcher.py", line 
98, in mutate self.flush() File 
"/usr/local/lib/python3.9/site-packages/apache_beam/io/gcp/bigtableio.py", line 
81, in flush raise Exception( Exception: Failed to write a batch of 285 records 
[while running 'Write to BigTable/ParDo(_BigTableWriteFn)-ptransform-27']
I understand that I'm encountering this 
bug<https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fgoogleapis%2Fpython-bigtable%2Fissues%2F485&data=05%7C01%7Channes%40hoxtonanalytics.com%7Cd9e1553c428f44aebc6a08daa27f3de1%7C246e7bd0c6a74d72b874d160f6ee8aa6%7C0%7C0%7C638000964626584262%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C&sdata=%2BoR8oE1e28P%2FIdjeLt8ARKG%2BJdgK1eJRtZReMDXdMxs%3D&reserved=0>
 as there is no error message from Bigtable.
Question 1: Is there a workaround for this or at least a guess on what might be 
going wrong? The error is consistent. I estimate each row I try to write to be 
on the order of ~10-15 kB.

I tried some smaller batches, but eventually just reverted to letting Beam 
write one cell at a time, and it seems like I can still make that a bit better 
with some careful grouping of entries before the BT write. That works OK, but I 
end up having a Beam job that has a whole bunch of relatively idle CPUs while 
the BT nodes (currently, 3) are overloaded. I can, and perhaps should, allow BT 
to scale to more nodes, but isn't there a better way?
Question 2: Is there a way to tell Beam to adapt its write rate to Bigtable as 
well as lower its numbers of workers?

Question 3/perhaps an alternative: I don't actually have to run on all data at 
once. It's useful, for performance reasons, to group entries for the same meter 
across time to some extent, but there are no dependencies across time stamps. 
Would it perhaps make sense to let my job run in sequenced batches instead, and 
is there an easy way of doing so? I was thinking of using fixed windows, but 
that doesn't affect the initial read, and it's also unclear if it does much 
more for me than GroupByKey with carefully chosen keys in batch mode.

That's a lot of questions, I know, but it seems like there could be many 
options on how to solve my performance problem!

Thanks,
-Lina

Reply via email to