Hi Lina, Unfortunately I don't have specific answers to Q1. For Q2 and Q3 we've been in a similar situation and detailing our approach might be helpful. We're using the Go SDK and are streaming, but it should still be equally applicable.
We have so far ended up with a custom transform that uses the official Bigtable client. This way it should be possible to implement any adaptive logic you'd like (Q2), however we're using a simpler approach at the moment and it's been working well for us. In our custom transform the DoFn is locally storing up to 100 000 elements per bundle. If we hit 100k during processing of the bundle we flush those values in a single batch request to Bigtable and otherwise we flush at the end of processing the bundle (in DoFn.FinishBundle). The reason for the particular number is because of an API limi [1]. For the first more naive approach we're storing the values in-memory, mainly because the Go SDK has not supported state so far, and it's working well, but I would eventually swap this for storing the values in state along with the count. Since you're using the Python SDK this should be possible and I imagine fairly easy. Hope that helps! [1] https://cloud.google.com/bigtable/docs/reference/data/rpc/google.bigtable.v2#google.bigtable.v2.MutateRowsRequest ________________________________________ From: Lina MÃ¥rtensson via user <[email protected]> Sent: 30 September 2022 01:58 To: [email protected] Subject: [Question] How to best handle load to Bigtable from Beam (Python) Hi Beam, I've happily been developing my Python Beam batch pipeline that writes to Bigtable for a while, and the time finally came to catch up on all our existing data. It's readings from many smart meters, every 15 minutes, organized in files coming in continuously over time - but for now, I want to run over these files in batch mode. I currently have ~6T entries, each with 5-10 values (floats) that I want to save. Our Bigtable is using time buckets where all data for a day is bucketed together, with 5-10 columns filled out per row, a (packed) float in each. I've been trying to set up my Beam job so that the writes to Bigtable can be efficient. The naive way is to write one cell at a time, which is as expected quite inefficient and the writes can be all over the place in the table. First off, I tried to optimize by grouping my entries and write an entire row (96 timestamps in a day * ~5-10 columns) at a time - but that fails as soon as I get to the WriteToBigtable stage, with messages like the following on every worker, whether I have a big job or run on a smaller data set: "/usr/local/lib/python3.9/site-packages/apache_beam/io/gcp/bigtableio.py", line 184, in process self.batcher.mutate(row) File "/usr/local/lib/python3.9/site-packages/google/cloud/bigtable/batcher.py", line 98, in mutate self.flush() File "/usr/local/lib/python3.9/site-packages/apache_beam/io/gcp/bigtableio.py", line 81, in flush raise Exception( Exception: Failed to write a batch of 285 records [while running 'Write to BigTable/ParDo(_BigTableWriteFn)-ptransform-27'] I understand that I'm encountering this bug<https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fgoogleapis%2Fpython-bigtable%2Fissues%2F485&data=05%7C01%7Channes%40hoxtonanalytics.com%7Cd9e1553c428f44aebc6a08daa27f3de1%7C246e7bd0c6a74d72b874d160f6ee8aa6%7C0%7C0%7C638000964626584262%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C&sdata=%2BoR8oE1e28P%2FIdjeLt8ARKG%2BJdgK1eJRtZReMDXdMxs%3D&reserved=0> as there is no error message from Bigtable. Question 1: Is there a workaround for this or at least a guess on what might be going wrong? The error is consistent. I estimate each row I try to write to be on the order of ~10-15 kB. I tried some smaller batches, but eventually just reverted to letting Beam write one cell at a time, and it seems like I can still make that a bit better with some careful grouping of entries before the BT write. That works OK, but I end up having a Beam job that has a whole bunch of relatively idle CPUs while the BT nodes (currently, 3) are overloaded. I can, and perhaps should, allow BT to scale to more nodes, but isn't there a better way? Question 2: Is there a way to tell Beam to adapt its write rate to Bigtable as well as lower its numbers of workers? Question 3/perhaps an alternative: I don't actually have to run on all data at once. It's useful, for performance reasons, to group entries for the same meter across time to some extent, but there are no dependencies across time stamps. Would it perhaps make sense to let my job run in sequenced batches instead, and is there an easy way of doing so? I was thinking of using fixed windows, but that doesn't affect the initial read, and it's also unclear if it does much more for me than GroupByKey with carefully chosen keys in batch mode. That's a lot of questions, I know, but it seems like there could be many options on how to solve my performance problem! Thanks, -Lina
