Thank you both for the thoughtful response. The main concept I was missing was what “chunk” and “batch” mean in the Arrow context. It makes more sense now to think of “chunks” as the number of rows, and “max_chunksize” as the maximum number of rows in a batch.
I’m going to document my thought process to the question above for future reference. In order to create record batches of a predetermined size from an Arrow table, we need to calculate how many rows should be in each batch. This is because the number of rows in a batch directly correlates to the size of the batch in bytes. Since Arrow is a columnar store, all columns must be present. So the smallest unit of storage is 1 row of all the columns. The storage size thus correlates to the number of rows packed into the same batch. (This is similar to parquet “row group” concept) The formula for target file size is then: target file size = (fixed number of columns) * (variable number of rows) We want to calculate the number of rows to pack in each batch so that each batch is the target file size. To calculate number of rows to pack into a batch, we can use the average row size and the target file size. (Average row size) = (table size) / (number of rows) (Number of rows) = (target file size) / (average row size) We can then pass number of rows to the `to_batches`’s “max_chunksize” to get a chunk with a desired target file size. The above method is implemented here <https://github.com/apache/iceberg-python/pull/444/files#diff-8d5e63f2a87ead8cebe2fd8ac5dcf2198d229f01e16bb9e06e21f7277c328abdR1768-R1770> so that we can batch an Arrow table into multiple batches of 512 MB and parallel write them as multiple parquet files. In the case where multiple small chunks are created, such as Case #1, the code then bin-packs <https://github.com/apache/iceberg-python/pull/444/files#diff-8d5e63f2a87ead8cebe2fd8ac5dcf2198d229f01e16bb9e06e21f7277c328abdR1771-R1777> the smaller chunks into the target file size. On Mon, Feb 26, 2024 at 9:25 AM Aldrin <[email protected]> wrote: > Hi Kevin, > > Shoumyo is correct that the chunk size of to_batches is row-based > (logical) and not byte-based (physical), see the example in the > documentation [1]. And for more clarity on the "...depending on the chunk > layout of individual columns" portion, a Table column is a ChunkedArray, > which is essentially a vector<Array>, so that determines the chunking > for the table. When using to_batches(), the logic is essentially to see if > a chunk is too big and needs to be resized [2], otherwise leave it as is. > > There are functions to get total buffer size (physical size) such as > get_total_buffer_size [3] or nbytes [4]. But, there is a bit of guess > and check to use that directly for your needs. I don't think there is a > canonical approach to satisfying your requirement, but you can check the > docs for IPC streams [5] or the dataset API ("configuring rows per group" > [6]) if you're eventually going to write to files. > > Otherwise, a naive approach is essentially a loop that tries varying chunk > sizes until it produces a satisfactory byte size. If you're willing to do > some coding at the cython level (since you're looking at that source), then > you can get a TableBatchReader and iteratively set a chunksize and read a > next batch (loop around [7]); or maybe use slice() for cheap chunking. I > would recommend making your initial chunksize guess based on your schema or > by "empirically" checking a chunksize of 1. > > > [1]: > https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.to_batches > [2]: > https://github.com/apache/arrow/blob/main/cpp/src/arrow/table.cc#L655-L657 > [3]: > https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatch.html#pyarrow.RecordBatch.get_total_buffer_size > [4]: > https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatch.html#pyarrow.RecordBatch.nbytes > [5]: https://arrow.apache.org/docs/python/ipc.html#using-streams > [6]: > https://arrow.apache.org/docs/python/dataset.html#configuring-rows-per-group-during-a-write > [7]: > https://github.com/apache/arrow/blob/main/python/pyarrow/table.pxi#L4235-L4248 > > > # ------------------------------ > # Aldrin > > https://github.com/drin/ > https://gitlab.com/octalene > https://keybase.io/octalene > > On Monday, February 26th, 2024 at 08:01, Shoumyo Chakravorti (BLOOMBERG/ > 731 LEX) <[email protected]> wrote: > > Hi Kevin, > > I'm not an Arrow dev so take everything I say with a grain a salt. I just > wanted to point out that the `max_chunksize` appears to refer to the max > number of *rows* per batch rather than the number of *bytes* per batch: > https://github.com/apache/arrow/blob/b8fff043c6cb351b1fad87fa0eeaf8dbc550e37c/cpp/src/arrow/table.cc#L647-L649 > . > > Additionally, `Table.to_batches()` is documented as being zero-copy, and > as you referred: "Individual chunks may be smaller depending on the chunk > layout of individual columns". This method will not copy the data in an > effort to make RecordBatches of more uniform size. > > In light of this information: > > > - In Case #1: `example_tbl.to_batches() * multiplier` creates 2048 > RecordBatches, and the Table becomes a zero-copy wrapper on top of the data > for each of these RecordBatches. Each ChunkedArray in the Table will > contain 2048 chunks/arrays. When this Table is converted via > `to_batches()`, by its zero-copy nature, the chunks will not be > concatenated. Therefore `bigger_pylist_tbl.to_batches` will yield at least > 2048 batches. > > - In Case #2: `max_chunksize > len(huge_arrow_tbl)`, and you mention > that the Table only has a single RecordBatch, so there is nothing to chunk > up. > > > I'm not sufficiently familiar with the Arrow APIs to know of a way to > chunk by a target number of bytes, so I'll let others chime in on that. > > Best, > Shoumyo > > From: [email protected] At: 02/26/24 01:08:41 UTC-5:00 > To: [email protected] > Subject: Chunk Table into RecordBatches of at most 512MB each > > Hey folks, > > I'm working with the PyArrow API for Tables and RecordBatches. And I'm > trying to chunk a Table into a list of RecordBatches each with a default > chunk size. For example, 10 GB into several 512MB chunks. > > I'm having a hard time doing this using the existing API. The > Table.to_batches method has an optional parameter `max_chunksize` which > is documented as "Maximum size for RecordBatch chunks. Individual chunks > may be smaller depending on the chunk layout of individual columns." It > seems exactly like what I want but I've run into a couple of edge cases. > > Edge case 1, Table created using many RecordBatches > ``` > pylist = [{'n_legs': 2, 'animals': 'Flamingo'}, > {'n_legs': 4, 'animals': 'Dog'}] > pylist_tbl = pa.Table.from_pylist(pylist) > # pylist_tbl.nbytes > # > 35 > multiplier = 2048 > bigger_pylist_tbl = pa.Table.from_batches(example_tbl.to_batches() * > multiplier) > # bigger_pylist_tbl.nbytes > # 591872 / 578.00 KB > > target_batch_size = 512 * 1024 * 1024 # 512 MB > len(bigger_pylist_tbl.to_batches(target_batch_size)) > # > 2048 > # expected, 1 RecordBatch > ``` > > Edge case 2, really big Table with 1 RecordBatch > ``` > # file already saved on disk > with pa.memory_map('table_10000000.arrow', 'r') as source: > huge_arrow_tbl = pa.ipc.open_file(source).read_all() > > huge_arrow_tbl.nbytes > # 7188263146 / 6.69 GB > len(huge_arrow_tbl) > # 10_000_000 > > target_batch_size = 512 * 1024 * 1024 # 512 MB > len(huge_arrow_tbl.to_batches(target_batch_size)) > # > 1 > # expected (6.69 GB // 512 MB) + 1 RecordBatches > ``` > > I'm currently exploring the underlying implementation for to_batches > <https://github.com/apache/arrow/blob/b8fff043c6cb351b1fad87fa0eeaf8dbc550e37c/python/pyarrow/table.pxi#L4182C26-L4250> > and TableBatchReader::ReadNext > <https://github.com/apache/arrow/blob/b8fff043c6cb351b1fad87fa0eeaf8dbc550e37c/cpp/src/arrow/table.cc#L641-L691> > . > Please let me know if anyone knows a canonical way to satisfy the chunking > behavior described above. > > Thanks, > Kevin > > > > > > >
