Yes, you can reduce your memory footprint. Both the RecordBatchStreamReader and the RecordBatchFileReader support reading a table a batch at a time. Compression is applied on a per-batch basis so there is no need to read the entire file just to decompress it.
For this to work, the file will need to have been written as multiple batches in the first place. You can use the RecordBatchFileWriter/RecordBatchStreamWriter to do this or you can set `chunksize` when using pyarrow.feather.write_feather. The default chunk size for write_feather is 64k and most tools that create arrows files will create reasonable sized chunks by default so this shouldn't be a problem. On Tue, Jul 13, 2021 at 12:06 PM Arun Joseph <[email protected]> wrote: > cool, that's good to know. I guess for now I'll just use the older method > until support is exposed for compression_level. I do have an unrelated > question: > > Is there a way to reduce the memory overhead when loading a compressed > feather file? I believe right now I decompress the file and then load the > entire thing into memory. Not sure if chunking is something that is > applicable here. I've read this article[1] from a couple of years back. > Would the right approach be to use pyarrow.RecordBatchStreamer to read a > file that was written with chunks and skip chunks that contain series I > don't care about? However, would that even reduce the memory footprint if > the file was compressed in the first place? or is the compression applied > on a per-chunk basis? > > [1] https://wesmckinney.com/blog/arrow-streaming-columnar/ > > On Tue, Jul 13, 2021 at 5:26 PM Weston Pace <[email protected]> wrote: > >> Ah, good catch. Looks like this is missing[1]. The default compression >> level for zstd is 1. >> >> [1] https://issues.apache.org/jira/browse/ARROW-13091 >> >> On Tue, Jul 13, 2021 at 10:39 AM Arun Joseph <[email protected]> wrote: >> >>> The IPC API seems to work for the most part, however is there a way to >>> specify compression level with IpcWriteOptions? It doesn't seem to be >>> exposed. I'm currently using zstd, so not sure what level it defaults to >>> otherwise: >>> Additionally, should I be enabling the allow_64bit bool? I have >>> nanosecond timestamps which would be truncated if it this option acts the >>> way I think it does. >>> >>> ``` >>> """ >>> Serialization options for the IPC format. >>> >>> Parameters >>> ---------- >>> metadata_version : MetadataVersion, default MetadataVersion.V5 >>> The metadata version to write. V5 is the current and latest, >>> V4 is the pre-1.0 metadata version (with incompatible Union layout). >>> allow_64bit: bool, default False >>> If true, allow field lengths that don't fit in a signed 32-bit int. >>> use_legacy_format : bool, default False >>> Whether to use the pre-Arrow 0.15 IPC format. >>> compression: str or None >>> If not None, compression codec to use for record batch buffers. >>> May only be "lz4", "zstd" or None. >>> use_threads: bool >>> Whether to use the global CPU thread pool to parallelize any >>> computational tasks like compression. >>> emit_dictionary_deltas: bool >>> Whether to emit dictionary deltas. Default is false for maximum >>> stream compatibility. >>> """ >>> >>> >>> On Tue, Jul 13, 2021 at 2:41 PM Weston Pace <[email protected]> >>> wrote: >>> >>>> I can't speak to the intent. Adding a feather.write_table version >>>> (equivalent to feather.read_table) seems like it would be reasonable. >>>> >>>> > Is the best way around this to do the following? >>>> >>>> What you have written does not work for me. This slightly different >>>> version does: >>>> >>>> ```python3 >>>> import pyarrow as pa >>>> import pyarrow._feather as _feather >>>> >>>> table = pa.Table.from_pandas(df) >>>> _feather.write_feather(table, '/tmp/foo.feather', >>>> compression=compression, >>>> compression_level=compression_level, >>>> chunksize=chunksize, version=version) >>>> ``` >>>> >>>> I'm not sure it's a great practice to be relying on pyarrow._feather >>>> though as it is meant to be internal and subject to change without >>>> much consideration. >>>> >>>> You might want to consider using the newer IPC API which should be >>>> equivalent (write_feather is indirectly using a RecordBatchFileWriter >>>> under the hood although it is buried in the C++[1]). A complete >>>> example: >>>> >>>> ```python3 >>>> import pandas as pd >>>> import pyarrow as pa >>>> import pyarrow.ipc >>>> >>>> df = pd.DataFrame({'a': [1, 2, 3], 'b': ['x', 'y', 'z']}) >>>> compression = None >>>> >>>> options = pyarrow.ipc.IpcWriteOptions() >>>> options.compression = compression >>>> writer = pyarrow.ipc.RecordBatchFileWriter('/tmp/foo2.feather', >>>> schema=table.schema, options=options) >>>> writer.write_table(table) >>>> writer.close() >>>> ``` >>>> >>>> If you need chunks it is slightly more work: >>>> >>>> ```python3 >>>> options = pyarrow.ipc.IpcWriteOptions() >>>> options.compression = compression >>>> writer = pyarrow.ipc.RecordBatchFileWriter('/tmp/foo3.feather', >>>> schema=table.schema, options=options) >>>> batches = table.to_batches(chunksize) >>>> for batch in batches: >>>> writer.write_batch(batch) >>>> writer.close() >>>> ``` >>>> >>>> All three versions should be readable by pyarrow.feather.read_feather >>>> and should yield the exact same dataframe. >>>> >>>> [1] >>>> https://github.com/apache/arrow/blob/81ff679c47754692224f655dab32cc0936bb5f55/cpp/src/arrow/ipc/feather.cc#L796 >>>> >>>> On Tue, Jul 13, 2021 at 7:06 AM Arun Joseph <[email protected]> wrote: >>>> > >>>> > Hi, >>>> > >>>> > I've noticed that if I pass a pandas dataframe to write_feather >>>> (hyperlink to relevant part of code), it will automatically drop the index. >>>> Was this behavior intentionally chosen to only drop the index and not to >>>> allow the user to specify? I assumed the behavior would match the default >>>> behavior of converting from a pandas dataframe to an arrow table as >>>> mentioned in the docs. >>>> > >>>> > Is the best way around this to do the following? >>>> > >>>> > ```python3 >>>> > import pyarrow.lib as ext >>>> > from pyarrow.lib import Table >>>> > >>>> > table = Table.from_pandas(df) >>>> > ext.write_feather(table, dest, >>>> > compression=compression, >>>> compression_level=compression_level, >>>> > chunksize=chunksize, version=version) >>>> > ``` >>>> > Thank You, >>>> > -- >>>> > Arun Joseph >>>> > >>>> >>> >>> >>> -- >>> Arun Joseph >>> >>> > > -- > Arun Joseph > >
