On Wed, 8 Dec 2021 at 00:32, David Li <lidav...@apache.org> wrote: > Just for edification (though I have limited understanding of the machinery > here, someone more familiar with Pandas internals may have more > insight/this may be wrong or very outdated!): > > zero_copy_only does not work for two reasons (well, one reason > fundamentally): the representation in memory of a Pandas dataframe has been > a dense, 2D NumPy array per column type. In other words, all data across > all columns of the same type are contiguous in memory. (At least > historically. My understanding is that this has changed/become more > flexible relatively recently.) This is the representation that Arrow tries > to generate by default. (See > https://uwekorn.com/2020/05/24/the-one-pandas-internal.html.) > > Just to confirm that your explanation is fully correct. (I have been working on changing this in pandas ( https://github.com/pandas-dev/pandas/issues/39146/), but this is right now only an experimental opt-in feature)
> However, the Arrow table you have is not contiguous: each column is > allocated separately, and for a Table, each column is made up of a list of > contiguous chunks. So there are very few cases where data can be > zero-copied, it must instead be copied and "compacted". > > The split_blocks option *helps* work around this. It allows each column in > the Pandas DataFrame to be its own allocation. However, each individual > column must still be contiguous. If you try zero_copy_only with > split_blocks, you'll get a different error message, this is because the > columns of your Arrow Table have more than one chunk. If you create a small > in-memory Table with only one column with one chunk, zero_copy_only + > split_blocks will work! > > split_blocks with self_destruct works in this case still because > self_destruct will still copy data, it will just also try to free the Arrow > data as each column is converted. (Note that to minimize the memory usage, > you should also pass use_threads=False. In that case, the maximum memory > overhead should be one column's worth.) > > -David > > On Tue, Dec 7, 2021, at 18:09, Weston Pace wrote: > > Thank you for the new example. > > # Why is it 2x? > > This is essentially a "peak RAM" usage of the operation. Given that > split_blocks helped I think we can attribute this doubling to the > pandas conversion. > > # Why doesn't the memory get returned? > > It does, it just doesn't do so immediately. If I put a 5 second sleep > before I print the memory I see that the RSS shrinks down. This is > how jemalloc is configured in Arrow (actually I think it is 1 second) > for releasing RSS after reaching peak consumption. > > BEFORE mem_size: 0.082276352gb > AFTER: mem_size: 6.68639232gb df_size: 3.281625104gb > AFTER-ARROW: 3.281625024gb > ---five second sleep--- > AFTER-SLEEP: mem_size: 3.3795072gb df_size: 3.281625104gb > AFTER-SLEEP-ARROW: 3.281625024gb > > # Why didn't switching to the system allocator help? > > The problem isn't "the dynamic allocator is allocating more than it > needs". There is a point in this process where ~6GB are actually > needed. The system allocator either also holds on to that RSS for a > little bit or the RSS numbers themselves take a little bit of time to > update. I'm not entirely sure. > > # Why isn't this a zero-copy conversion to pandas? > > That's a good question, I don't know the details. If I try manually > doing the conversion with zero_copy_only I get the error "Cannot do > zero copy conversion into multi-column DataFrame block" > > # What is up with the numpy.ndarray objects in the heap? > > I'm pretty sure guppy3 is double-counting. Note that the total size > is ~20GB. I've been able to reproduce this in cases where the heap is > 3GB and guppy still shows the dataframe taking up 6GB. In fact, I > once even managed to generate this: > > AFTER-SLEEP: mem_size: 3.435835392gb df_size: 3.339197344gb > AFTER-SLEEP-ARROW: 0.0gb > Partition of a set of 212560 objects. Total size = 13328742559 bytes. > Index Count % Size % Cumulative % Kind (class / dict of class) > 0 57 0 6563250864 49 6563250864 49 pandas.core.series.Series > 1 133 0 3339213718 25 9902464582 74 numpy.ndarray > 2 1 0 3339197360 25 13241661942 99 > pandas.core.frame.DataFrame > > The RSS is 3.44GB but guppy reports the dataframe as 13GB. > > I did see some strange behavior when working with the > RecordBatchFileReader and I opened ARROW-15017 to resolve this but you > can work around this by deleting the reader. > > # Can I return the data immediately / I don't want to use 2x memory > consumption > > I think split_blocks and self_destruct is the best answer at the > moment. self_destruct has remained in the code since at least 1.0.0 > so perhaps it is time we remove the "experimental" flag and maybe > replace it with a "caution" or "danger" flag (as it causes the table > to become unusable afterwards). > > Jemalloc has some manual facilities to purge dirty memory and we > expose some of them with > pyarrow.default_memory_pool().release_unused() but that doesn't seem > to be helping in this situation. Either the excess memory is in the > non-jemalloc pool or the jemalloc command can't quite release this > memory, or the RSS stats are just stale. I'm not entirely sure. > > On Tue, Dec 7, 2021 at 11:54 AM Arun Joseph <ajos...@gmail.com> wrote: > > > > Slightly related, I have some other code that opens up an arrow file > using a `pyarrow.ipc.RecordBatchFileReader` and then converts RecordBatch > to a pandas dataframe. After this conversion is done, and I inspect the > heap, I always see the following: > > > > hpy().heap() > > Partition of a set of 351136 objects. Total size = 20112096840 bytes. > > Index Count % Size % Cumulative % Kind (class / dict of class) > > 0 121 0 9939601034 49 9939601034 49 numpy.ndarray > > 1 1 0 9939585700 49 19879186734 99 > pandas.core.frame.DataFrame > > 2 1 0 185786680 1 20064973414 100 > pandas.core.indexes.datetimes.DatetimeIndex > > > > Specifically the numpy.ndarray. It only shows up after the conversion > and it does not seem to go away. It also seems to be roughly the same size > as the dataframe itself. > > > > - Arun > > > > On Tue, Dec 7, 2021 at 10:21 AM Arun Joseph <ajos...@gmail.com> wrote: > >> > >> Just to follow up on this, is there a way to manually force the arrow > pool to de-allocate? My usecase is essentially having multiple processes in > a Pool or via Slurm read from an arrow file, do some work, and then exit. > Issue is that the 2x memory consumption reduces the bandwidth on the > machine to effectively half. > >> > >> Thank You, > >> Arun > >> > >> On Mon, Dec 6, 2021 at 10:38 AM Arun Joseph <ajos...@gmail.com> wrote: > >>> > >>> Additionally, I tested with my actual data, and did not see memory > savings. > >>> > >>> On Mon, Dec 6, 2021 at 10:35 AM Arun Joseph <ajos...@gmail.com> wrote: > >>>> > >>>> Hi Joris, > >>>> > >>>> Thank you for the explanation. The 2x memory consumption on > conversion makes sense if there is a copy, but it does seem like it > persists longer than it should. Might be because of python's GC policies? > >>>> I tried out your recommendations but they did not seem to work. > However, I did notice an experimental option on `to_pandas`, > `self_destruct`, which seems to address the issue I'm facing. Sadly, that > itself did not work either... but, combined with `split_blocks=True`, I am > seeing memory savings: > >>>> > >>>> import pandas as pd > >>>> import numpy as np > >>>> import pyarrow as pa > >>>> from pyarrow import feather > >>>> import os > >>>> import psutil > >>>> pa.set_memory_pool(pa.system_memory_pool()) > >>>> DATA_FILE = 'test.arrow' > >>>> > >>>> def setup(): > >>>> np.random.seed(0) > >>>> df = pd.DataFrame(np.random.randint(0,100,size=(7196546, 57)), > columns=list([f'{i}' for i in range(57)])) > >>>> df.to_feather(DATA_FILE) > >>>> print(f'wrote {DATA_FILE}') > >>>> import sys > >>>> sys.exit() > >>>> > >>>> if __name__ == "__main__": > >>>> # setup() > >>>> process = psutil.Process(os.getpid()) > >>>> path = DATA_FILE > >>>> > >>>> mem_size = process.memory_info().rss / 1e9 > >>>> print(f'BEFORE mem_size: {mem_size}gb') > >>>> > >>>> feather_table = feather.read_table(path) > >>>> # df = feather_table.to_pandas(split_blocks=True) > >>>> # df = feather_table.to_pandas() > >>>> df = feather_table.to_pandas(self_destruct=True, split_blocks=True) > >>>> > >>>> mem_size = process.memory_info().rss / 1e9 > >>>> df_size = df.memory_usage().sum() / 1e9 > >>>> print(f'AFTER mem_size: {mem_size}gb df_size: {df_size}gb') > >>>> print(f'ARROW: {pa.default_memory_pool().bytes_allocated() / > 1e9}gb') > >>>> > >>>> > >>>> OUTPUT(to_pandas()): > >>>> BEFORE mem_size: 0.091795456gb > >>>> AFTER mem_size: 6.737887232gb df_size: 3.281625104gb > >>>> ARROW: 3.281625024gb > >>>> > >>>> OUTPUT (to_pandas(split_blocks=True)): > >>>> BEFORE mem_size: 0.091795456gb > >>>> AFTER mem_size: 6.752907264gb df_size: 3.281625104gb > >>>> ARROW: 3.281627712gb > >>>> > >>>> OUTPUT (to_pandas(self_destruct=True, split_blocks=True)): > >>>> BEFORE mem_size: 0.091795456gb > >>>> AFTER mem_size: 4.039512064gb df_size: 3.281625104gb > >>>> ARROW: 3.281627712gb > >>>> > >>>> I'm guessing since this feature is experimental, it might either go > away, or might have strange behaviors. Is there anything I should look out > for, or is there some alternative to reproduce these results? > >>>> > >>>> Thank You, > >>>> Arun > >>>> > >>>> On Mon, Dec 6, 2021 at 10:07 AM Joris Van den Bossche < > jorisvandenboss...@gmail.com> wrote: > >>>>> > >>>>> Hi Aron, Weston, > >>>>> > >>>>> I didn't try running the script locally, but a quick note: the > >>>>> `feather.read_feather` function reads the Feather file into an Arrow > >>>>> table ànd directly converts it to a pandas DataFrame. A memory > >>>>> consumption 2x the size of the dataframe sounds not that unexpected > to > >>>>> me: most of the time, when converting an arrow table to a pandas > >>>>> DataFrame, the data will be copied to accommodate for pandas' > specific > >>>>> internal memory layout (at least numeric columns will be combined > >>>>> together in 2D arrays). > >>>>> > >>>>> To verify if this is the cause, you might want to do either of: > >>>>> - use `feather.read_table` instead of `feather.read_feather`, which > >>>>> will read the file as an Arrow table instead (and don't do any > >>>>> conversion to pandas) > >>>>> - if you want to include the conversion to pandas, also use > >>>>> `read_table` and do the conversion to pandas explicitly with a > >>>>> `to_pandas()` call on the result. In that case, you can specify > >>>>> `split_blocks=True` to use more zero-copy conversion in the > >>>>> arrow->pandas conversion > >>>>> > >>>>> Joris > >>>>> > >>>>> On Mon, 6 Dec 2021 at 15:05, Arun Joseph <ajos...@gmail.com> wrote: > >>>>> > > >>>>> > Hi Wes, > >>>>> > > >>>>> > Sorry for the late reply on this, but I think I got a reproducible > test case: > >>>>> > > >>>>> > import pandas as pd > >>>>> > import numpy as np > >>>>> > import pyarrow as pa > >>>>> > from pyarrow import feather > >>>>> > import os > >>>>> > import psutil > >>>>> > pa.set_memory_pool(pa.system_memory_pool()) > >>>>> > DATA_FILE = 'test.arrow' > >>>>> > > >>>>> > def setup(): > >>>>> > np.random.seed(0) > >>>>> > df = pd.DataFrame(np.random.uniform(0,100,size=(7196546, 57)), > columns=list([f'i_{i}' for i in range(57)])) > >>>>> > df.to_feather(DATA_FILE) > >>>>> > print(f'wrote {DATA_FILE}') > >>>>> > import sys > >>>>> > sys.exit() > >>>>> > > >>>>> > if __name__ == "__main__": > >>>>> > # setup() > >>>>> > process = psutil.Process(os.getpid()) > >>>>> > path = DATA_FILE > >>>>> > > >>>>> > mem_size = process.memory_info().rss / 1e9 > >>>>> > print(f'BEFORE mem_size: {mem_size}gb') > >>>>> > > >>>>> > df = feather.read_feather(path) > >>>>> > > >>>>> > mem_size = process.memory_info().rss / 1e9 > >>>>> > df_size = df.memory_usage().sum() / 1e9 > >>>>> > print(f'AFTER mem_size: {mem_size}gb df_size: {df_size}gb') > >>>>> > print(f'ARROW: {pa.default_memory_pool().bytes_allocated() / > 1e9}gb') > >>>>> > > >>>>> > OUTPUT: > >>>>> > BEFORE mem_size: 0.091795456gb > >>>>> > AFTER mem_size: 6.762156032gb df_size: 3.281625104gb > >>>>> > ARROW: 3.281625024gb > >>>>> > > >>>>> > Let me know if you're able to see similar results. > >>>>> > > >>>>> > Thanks, > >>>>> > Arun > >>>>> > > >>>>> > On Fri, Dec 3, 2021 at 6:03 PM Weston Pace <weston.p...@gmail.com> > wrote: > >>>>> >> > >>>>> >> I get more or less the same results as you for the provided setup > data > >>>>> >> (exact same #'s for arrow & df_size and slightly different for RSS > >>>>> >> which is to be expected). The fact that the arrow size is much > lower > >>>>> >> than the dataframe size is not too surprising to me. If a column > >>>>> >> can't be zero copied then it's memory will disappear from the > arrow > >>>>> >> pool (I think). Plus, object columns will have overhead in pandas > >>>>> >> that they do not have in Arrow. > >>>>> >> > >>>>> >> The df_size issue for me seems to be tied to string columns. I > think > >>>>> >> pandas is overestimating how much size is needed there (many of my > >>>>> >> strings are similar and I wonder if some kind of object sharing is > >>>>> >> happening). But we can table this for another time. > >>>>> >> > >>>>> >> I tried writing my feather file with your parameters and it didn't > >>>>> >> have much impact on any of the numbers. > >>>>> >> > >>>>> >> Since the arrow size for you is expected (nearly the same as the > >>>>> >> df_size) I'm not sure what to investigate next. The memory does > not > >>>>> >> seem to be retained by Arrow. Is there any chance you could > create a > >>>>> >> reproducible test case using randomly generated numpy data (then > you > >>>>> >> could share that setup function)? > >>>>> >> > >>>>> >> On Fri, Dec 3, 2021 at 12:13 PM Arun Joseph <ajos...@gmail.com> > wrote: > >>>>> >> > > >>>>> >> > Hi Wes, > >>>>> >> > > >>>>> >> > I'm not including the setup() call when I encounter the issue. > I just kept it in there for ease of reproducibility. Memory usage is indeed > higher when it is included, but that isn't surprising. > >>>>> >> > > >>>>> >> > I tried switching over to the system allocator but there is no > change. > >>>>> >> > > >>>>> >> > I've updated to Arrow 6.0.1 as well and there is no change. > >>>>> >> > > >>>>> >> > I updated my script to also include the Arrow bytes allocated > and it gave me the following: > >>>>> >> > > >>>>> >> > MVE: > >>>>> >> > import pandas as pd > >>>>> >> > import pyarrow as pa > >>>>> >> > from pyarrow import feather > >>>>> >> > import os > >>>>> >> > import psutil > >>>>> >> > pa.set_memory_pool(pa.system_memory_pool()) > >>>>> >> > > >>>>> >> > > >>>>> >> > def setup(): > >>>>> >> > df = pd.read_csv(' > https://www.stats.govt.nz/assets/Uploads/Annual-enterprise-survey/Annual-enterprise-survey-2020-financial-year-provisional/Download-data/annual-enterprise-survey-2020-financial-year-provisional-csv.csv > ') > >>>>> >> > df.to_feather('test.csv') > >>>>> >> > > >>>>> >> > if __name__ == "__main__": > >>>>> >> > # setup() > >>>>> >> > process = psutil.Process(os.getpid()) > >>>>> >> > path = 'test.csv' > >>>>> >> > > >>>>> >> > mem_size = process.memory_info().rss / 1e9 > >>>>> >> > print(f'BEFORE mem_size: {mem_size}gb') > >>>>> >> > > >>>>> >> > df = feather.read_feather(path) > >>>>> >> > > >>>>> >> > df_size = df.memory_usage(deep=True).sum() / 1e9 > >>>>> >> > mem_size = process.memory_info().rss / 1e10 > >>>>> >> > print(f'AFTER mem_size: {mem_size}gb df_size: {df_size}gb') > >>>>> >> > print(f'ARROW: {pa.default_memory_pool().bytes_allocated() / > 1e9}gb') > >>>>> >> > > >>>>> >> > Output with my data: > >>>>> >> > BEFORE mem_size: 0.08761344gb > >>>>> >> > AFTER mem_size: 6.297198592gb df_size: 3.080121688gb > >>>>> >> > ARROW: 3.080121792gb > >>>>> >> > > >>>>> >> > Output with Provided Setup Data: > >>>>> >> > BEFORE mem_size: 0.09179136gb > >>>>> >> > AFTER mem_size: 0.011487232gb df_size: 0.024564664gb > >>>>> >> > ARROW: 0.00029664gb > >>>>> >> > > >>>>> >> > I'm assuming that the df and the arrow bytes allocated/sizes > are distinct and non-overlapping, but it seems strange that the output with > the provided data has the Arrow bytes allocated at ~0GB whereas the one > with my data has the allocated data approximately equal to the dataframe > size. I'm not sure if it affects anything but my file was written with the > following: > >>>>> >> > > >>>>> >> > import pyarrow.lib as ext > >>>>> >> > import pyarrow > >>>>> >> > COMPRESSION_LEVEL = 19 > >>>>> >> > COMPRESSION_ALGO = 'zstd' > >>>>> >> > KILOBYTE = 1 << 10 > >>>>> >> > MEGABYTE = KILOBYTE * KILOBYTE > >>>>> >> > CHUNK_SIZE = MEGABYTE > >>>>> >> > > >>>>> >> > table = pyarrow.Table.from_pandas(df, > preserve_index=preserve_index) > >>>>> >> > ext.write_feather(table, dest, compression=compression, > compression_level=compression_level,chunksize=chunk_size, version=2) > >>>>> >> > > >>>>> >> > As to the discrepancy around calculating dataframe size. I'm > not sure why that would be so off for you. Going off the docs, it seems > like it should be accurate. My Dataframe in question is [7196546 rows x 56 > columns] where each column is mostly a float or integer and datetime index. > 7196546 * 56 * 8 = 3224052608 ~= 3.2GB which roughly aligns. > >>>>> >> > > >>>>> >> > Thank You, > >>>>> >> > Arun > >>>>> >> > > >>>>> >> > On Fri, Dec 3, 2021 at 4:36 PM Weston Pace < > weston.p...@gmail.com> wrote: > >>>>> >> >> > >>>>> >> >> 2x overshoot of memory does seem a little high. Are you > including the > >>>>> >> >> "setup" part when you encounter that? Arrow's file-based CSV > reader > >>>>> >> >> will require 2-3x memory usage because it buffers the bytes in > memory > >>>>> >> >> in case it needs to re-convert them later (because it realizes > the > >>>>> >> >> data type for the column is different). I'm not sure if > Panda's CSV > >>>>> >> >> reader is similar. > >>>>> >> >> > >>>>> >> >> Dynamic memory allocators (e.g. jemalloc) can cause Arrow to > hold on > >>>>> >> >> to a bit more memory and hold onto it (for a little while at > least) > >>>>> >> >> even after it is no longer used. Even malloc will hold onto > memory > >>>>> >> >> sometimes due to fragmentation or other concerns. You could > try > >>>>> >> >> changing to the system allocator > >>>>> >> >> (pa.set_memory_pool(pa.system_memory_pool()) at the top of > your file) > >>>>> >> >> to see if that makes a difference. > >>>>> >> >> > >>>>> >> >> I'm not sure your method of calculating the dataframe size is > >>>>> >> >> reliable. I don't actually know enough about pandas but when > I tried > >>>>> >> >> your experiment with my own 1.9G CSV file it ended up > reporting: > >>>>> >> >> > >>>>> >> >> AFTER mem_size: 2.348068864gb df_size: 4.519898461gb > >>>>> >> >> > >>>>> >> >> which seems suspicious. > >>>>> >> >> > >>>>> >> >> Anyways, my tests with my own CSV file (on Arrow 6.0.1) didn't > seem > >>>>> >> >> all that unexpected. There was 2.348GB of usage. Arrow > itself was > >>>>> >> >> only using ~1.9GB and I will naively assume the difference > between the > >>>>> >> >> two is bloat caused by object wrappers when converting to > pandas. > >>>>> >> >> > >>>>> >> >> Another thing you might try and measure is > >>>>> >> >> `pa.default_memory_pool().bytes_allocated()`. This will tell > you how > >>>>> >> >> much memory Arrow itself is hanging onto. If that is not 6GB > then it > >>>>> >> >> is a pretty good guess that memory is being held somewhere > else. > >>>>> >> >> > >>>>> >> >> On Fri, Dec 3, 2021 at 10:54 AM Arun Joseph <ajos...@gmail.com> > wrote: > >>>>> >> >> > > >>>>> >> >> > Hi Apache Arrow Members, > >>>>> >> >> > > >>>>> >> >> > My question is below but I've compiled a minimum > reproducible example with a public dataset: > >>>>> >> >> > > >>>>> >> >> > import pandas as pd > >>>>> >> >> > from pyarrow import feather > >>>>> >> >> > import os > >>>>> >> >> > import psutil > >>>>> >> >> > > >>>>> >> >> > > >>>>> >> >> > def setup(): > >>>>> >> >> > df = pd.read_csv(' > https://www.stats.govt.nz/assets/Uploads/Annual-enterprise-survey/Annual-enterprise-survey-2020-financial-year-provisional/Download-data/annual-enterprise-survey-2020-financial-year-provisional-csv.csv > ') > >>>>> >> >> > df.to_feather('test.csv') > >>>>> >> >> > > >>>>> >> >> > if __name__ == "__main__": > >>>>> >> >> > # setup() > >>>>> >> >> > process = psutil.Process(os.getpid()) > >>>>> >> >> > path = 'test.csv' > >>>>> >> >> > > >>>>> >> >> > mem_size = process.memory_info().rss / 1e9 > >>>>> >> >> > print(f'BEFORE mem_size: {mem_size}gb') > >>>>> >> >> > > >>>>> >> >> > df = feather.read_feather(path) > >>>>> >> >> > > >>>>> >> >> > df_size = df.memory_usage(deep=True).sum() / 1e9 > >>>>> >> >> > mem_size = process.memory_info().rss / 1e9 > >>>>> >> >> > print(f'AFTER mem_size: {mem_size}gb df_size: {df_size}gb') > >>>>> >> >> > > >>>>> >> >> > I substituted my df with a sample csv. I had trouble finding > a sample CSV of adequate size however, my dataset is ~3GB, and I see memory > usage of close to 6GB. > >>>>> >> >> > > >>>>> >> >> > Output with My Data: > >>>>> >> >> > BEFORE mem_size: 0.088891392gb > >>>>> >> >> > AFTER mem_size: 6.324678656gb df_size: 3.080121688gb > >>>>> >> >> > > >>>>> >> >> > It seems strange that the overall memory usage of the > process is approx double of the size of the dataframe itself. Is there a > reason for this, and is there a way to mitigate this? > >>>>> >> >> > > >>>>> >> >> > $ conda list pyarrow > >>>>> >> >> > # > >>>>> >> >> > # Name Version Build > Channel > >>>>> >> >> > pyarrow 4.0.1 > py37h0f64622_13_cpu conda-forge > >>>>> >> >> > > >>>>> >> >> > Thank You, > >>>>> >> >> > Arun Joseph > >>>>> >> >> > > >>>>> >> > > >>>>> >> > > >>>>> >> > > >>>>> >> > -- > >>>>> >> > Arun Joseph > >>>>> >> > > >>>>> > > >>>>> > > >>>>> > > >>>>> > -- > >>>>> > Arun Joseph > >>>>> > > >>>> > >>>> > >>>> > >>>> -- > >>>> Arun Joseph > >>>> > >>> > >>> > >>> -- > >>> Arun Joseph > >>> > >> > >> > >> -- > >> Arun Joseph > >> > > > > > > -- > > Arun Joseph > > > > >