Hi Kevin, Shoumyo is correct that the chunk size of to_batches is row-based (logical) and not byte-based (physical), see the example in the documentation [1]. And for more clarity on the "...depending on the chunk layout of individual columns" portion, a Table column is a `ChunkedArray`, which is essentially a `vector<Array>`, so that determines the chunking for the table. When using to_batches(), the logic is essentially to see if a chunk is too big and needs to be resized [2], otherwise leave it as is.
There are functions to get total buffer size (physical size) such as
`get_total_buffer_size` [3] or nbytes [4]. But, there is a bit of guess and
check to use that directly for your needs. I don't think there is a canonical
approach to satisfying your requirement, but you can check the docs for IPC
streams [5] or the dataset API ("configuring rows per group" [6]) if you're
eventually going to write to files.
Otherwise, a naive approach is essentially a loop that tries varying chunk
sizes until it produces a satisfactory byte size. If you're willing to do some
coding at the cython level (since you're looking at that source), then you can
get a TableBatchReader and iteratively set a chunksize and read a next batch
(loop around [7]); or maybe use `slice()` for cheap chunking. I would recommend
making your initial chunksize guess based on your schema or by "empirically"
checking a chunksize of 1.
[1]:
https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.to_batches
[2]: https://github.com/apache/arrow/blob/main/cpp/src/arrow/table.cc#L655-L657
[3]:
https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatch.html#pyarrow.RecordBatch.get_total_buffer_size
[4]:
https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatch.html#pyarrow.RecordBatch.nbytes
[5]: https://arrow.apache.org/docs/python/ipc.html#using-streams
[6]:
https://arrow.apache.org/docs/python/dataset.html#configuring-rows-per-group-during-a-write
[7]:
https://github.com/apache/arrow/blob/main/python/pyarrow/table.pxi#L4235-L4248
# ------------------------------
# Aldrin
https://github.com/drin/
https://gitlab.com/octalene
https://keybase.io/octalene
On Monday, February 26th, 2024 at 08:01, Shoumyo Chakravorti (BLOOMBERG/ 731
LEX) <[email protected]> wrote:
> Hi Kevin,
> I'm not an Arrow dev so take everything I say with a grain a salt. I just
> wanted to point out that the `max_chunksize` appears to refer to the max
> number of *rows* per batch rather than the number of *bytes* per batch:
> https://github.com/apache/arrow/blob/b8fff043c6cb351b1fad87fa0eeaf8dbc550e37c/cpp/src/arrow/table.cc#L647-L649.
>
> Additionally, `Table.to_batches()` is documented as being zero-copy, and as
> you referred: "Individual chunks may be smaller depending on the chunk layout
> of individual columns". This method will not copy the data in an effort to
> make RecordBatches of more uniform size.
>
> In light of this information:
>
>
> - In Case #1: `example_tbl.to_batches() * multiplier` creates 2048
> RecordBatches, and the Table becomes a zero-copy wrapper on top of the data
> for each of these RecordBatches. Each ChunkedArray in the Table will contain
> 2048 chunks/arrays. When this Table is converted via `to_batches()`, by its
> zero-copy nature, the chunks will not be concatenated. Therefore
> `bigger_pylist_tbl.to_batches` will yield at least 2048 batches.
>
> - In Case #2: `max_chunksize > len(huge_arrow_tbl)`, and you mention that
> the Table only has a single RecordBatch, so there is nothing to chunk up.
>
>
> I'm not sufficiently familiar with the Arrow APIs to know of a way to chunk
> by a target number of bytes, so I'll let others chime in on that.
>
> Best,
> Shoumyo
>
> From: [email protected] At: 02/26/24 01:08:41 UTC-5:00
> To: [email protected]
> Subject: Chunk Table into RecordBatches of at most 512MB each
>
> > Hey folks,
> >
> > I'm working with the PyArrow API for Tables and RecordBatches. And I'm
> > trying to chunk a Table into a list of RecordBatches each with a default
> > chunk size. For example, 10 GB into several 512MB chunks.
> >
> > I'm having a hard time doing this using the existing API. The
> > Table.to_batches method has an optional parameter `max_chunksize` which is
> > documented as "Maximum size for RecordBatch chunks. Individual chunks may
> > be smaller depending on the chunk layout of individual columns." It seems
> > exactly like what I want but I've run into a couple of edge cases.
> >
> >
> > Edge case 1, Table created using many RecordBatches
> > ```
> > pylist = [{'n_legs': 2, 'animals': 'Flamingo'},
> > {'n_legs': 4, 'animals': 'Dog'}]
> > pylist_tbl = pa.Table.from_pylist(pylist)
> >
> > # pylist_tbl.nbytes
> > # > 35
> > multiplier = 2048
> > bigger_pylist_tbl = pa.Table.from_batches(example_tbl.to_batches() *
> > multiplier)
> > # bigger_pylist_tbl.nbytes
> > # 591872 / 578.00 KB
> >
> >
> > target_batch_size = 512 * 1024 * 1024 # 512 MB
> > len(bigger_pylist_tbl.to_batches(target_batch_size))
> > # > 2048
> > # expected, 1 RecordBatch
> > ```
> >
> >
> > Edge case 2, really big Table with 1 RecordBatch
> > ```
> > # file already saved on disk
> > with pa.memory_map('table_10000000.arrow', 'r') as source:
> > huge_arrow_tbl = pa.ipc.open_file(source).read_all()
> >
> > huge_arrow_tbl.nbytes
> > # 7188263146 / 6.69 GB
> > len(huge_arrow_tbl)
> > # 10_000_000
> >
> > target_batch_size = 512 * 1024 * 1024 # 512 MB
> > len(huge_arrow_tbl.to_batches(target_batch_size))
> > # > 1
> > # expected (6.69 GB // 512 MB) + 1 RecordBatches
> > ```
> >
> >
> > I'm currently exploring the underlying implementation for to_batches and
> > TableBatchReader::ReadNext.
> > Please let me know if anyone knows a canonical way to satisfy the chunking
> > behavior described above.
> >
> >
> > Thanks,
> > Kevin
> >
> >
> >
> >
> >
> >
> >
publickey - [email protected] - 0x21969656.asc
Description: application/pgp-keys
signature.asc
Description: OpenPGP digital signature
