Apologies for the late reply on this. I just wanted to say thank you Weston for taking the time to provide that detailed explanation on memory mapping! (You should turn that into a blog post ha!)
Kind regards Nikhil Makan On Wed, Sep 21, 2022 at 5:21 PM Weston Pace <[email protected]> wrote: > > The dataset API still makes use of multiple cores though correct? > > Yes. It tries to use enough threads to ensure it is using the full > processor. > > > How does this then relate to the filesystems interface and native > support for HDFS, GCFS and S3. Do these exhibit the same issue? > > No, they should not. I am not aware of the specifics of the Azure > issue but I know we can handle concurrent reads on the native S3 and > GCFS filesystems and I assume we can on HDFS but I have never set HDFS > up myself. > > > Further to this are per my earlier discussions on this thread we are > unable to do partial reads of a blob in Azure, I wanted to know if that is > possible with any of the other three that have native support. i.e. can we > filter the data downloaded from these instead of downloading everything and > then filtering? > > There are two questions here: > > 1. Can we read part of a file from the filesystem? > > Yes, all filesystem implementations that I am aware of support this. > > 2. Can we reduce the amount of data read by using a filter? > > Parquet is the only format that has some form of this. We can > eliminate entire row groups from consideration if there is a pushdown > filter and the row group statistics are informative enough. All > formats (including Azure) support some pushdown support through > partitioning. If the files are stored in a partitioned manner we can > possible eliminate entire directories from consideration with a > pushdown filter (e.g. if the filter is year==2004 then we can > eliminate the directory `/year=2003/month=July`) > > There is more we could do here, both with parquet (page-level indices) > and IPC (e.g. arrow/feather files which have no statistics at the > moment). We just need someone that has the time and energy to > implement it. > > > I don't think I quite follow this. Happy to be pointed to some > documentation to read more on this by the way. > > I have yet to find a good guide that explains this so I'll make a > brief attempt at clarification. > > 1. If you read in a buffer of data from the disk, and then you discard > that buffer (e.g. delete it), then that physical memory can be > returned to the OS (barring fragmentation) regardless of how it was > read in (e.g. memory mapping or regular file). > 2. If you read in a buffer of data from the disk, and you do not > discard that buffer, then that physical memory cannot be returned to > the OS unless it is swapped out. This is true regardless of how it > was read in. > > A. Normal reads > > A normal read will first copy the buffer from the disk to the kernel > page cache. The caller controls exactly when this I/O occurs because > it will be during the call to `read()`. The kernel will then perform > a memcpy from the kernel page cache to a user-space caller-provided > buffer that the caller has allocated. > > ``` > // Caller provides the buffer > void* my_buffer = malloc(100); > // Read happens here, can control exactly when it happens > read(fp, my_buffer, 100); > ``` > > B. Memory mapped reads > > A memory mapped read will first copy the buffer from the disk to the > kernel page cache. The caller doesn't have much control over when > this I/O occurs because it will happen whenever the kernel decides it > needs the page in the page cache to be populated (usually when the > user first tries to access the data and a page fault occurs). It will > then give the user a pointer directly into the kernel page cache. > > // Buffer provided by the kernel, no read happens here > void* my_buffer = mmap(...); > // ... > // ... > // This line probably triggers a page fault and blocks while the data > is read from the disk. > int x = ((int*)my_buffer)[0]; > > > I thought the basic idea behind memory mapping is that the data > structure has the same representation on disk as it does in memory > therefore allowing it to not consume additional memory when reading it > > No. We cannot, for example, do arithmetic on data that is on the > disk. The CPU requires that data first be brought into RAM. > > > So would the dataset API process multiple files potentially quicker > without memory mapping. > > Probably not. If your data happened to already be fully in the kernel > page cache then yes, the dataset API would probably process the file > slightly faster. However, this would typically only be true if your > entire dataset (or at least the working set you are dealing with) is > smaller than your physical RAM and has been accessed recently. If the > data is not already in the kernel page cache then memory mapping will > probably have a negative effect. This is because the page faults come > at unexpected times and can block the process at times we don't expect > it to. > > > Also correct me if I am wrong, but memory mapping is related to the ipc > format only, formats such as parquet cannot take advantage of this? > > Any format can use memory mapped I/O. > > SO....why bother? > > Memory mapping is more typically used for IPC. In particular, it can > be used to perform true zero-copy IPC. This IPC would only be true > zero copy with the IPC format. > > Process A memory maps a file. > Process A populates that region of memory with a table it generates in > some way. > Process A sends a control signal to process B that the table is ready. > Process B memory maps the same file (we know it will be in the kernel > page cache because we just used this RAM to write to it). > Process B operates on the data in some way. > > On Tue, Sep 20, 2022 at 4:46 PM Nikhil Makan <[email protected]> > wrote: > > > > Hi Weston, thanks for the response! > > > > > I would say that this is always a problem. In the datasets API the > > goal is to maximize the resource usage within a single process. Now, > > it may be a known or expected problem :) > > > > The dataset API still makes use of multiple cores though correct? > > How does this then relate to the filesystems interface and native > support for HDFS, GCFS and S3. Do these exhibit the same issue? Further to > this are per my earlier discussions on this thread we are unable to do > partial reads of a blob in Azure, I wanted to know if that is possible with > any of the other three that have native support. i.e. can we filter the > data downloaded from these instead of downloading everything and then > filtering? > > > > > I think the benefits of memory mapping are rather subtle and often > > misleading. Datasets can make use of memory mapping for local > > filesystems. Doing so will, at best, have a slight performance > > benefit (avoiding a memcpy) but would most likely decrease performance > > (by introducing I/O where it is not expected) and it will have no > > effect whatsoever on the amount of RAM used. > > > > I don't think I quite follow this. Happy to be pointed to some > documentation to read more on this by the way. I thought the basic idea > behind memory mapping is that the data structure has the same > representation on disk as it does in memory therefore allowing it to not > consume additional memory when reading it, which is typical with normal I/O > operations with reading files. So would the dataset API process multiple > files potentially quicker without memory mapping. Also correct me if I am > wrong, but memory mapping is related to the ipc format only, formats such > as parquet cannot take advantage of this? > > > > Kind regards > > Nikhil Makan > > > > > > On Tue, Sep 20, 2022 at 5:12 AM Weston Pace <[email protected]> > wrote: > >> > >> Sorry for the slow reply. > >> > >> > This could be something on the Azure side but I find I am being > bottlenecked on the download speed and have noticed if I spin up multiple > Python sessions (or in my case interactive windows) I can increase my > throughput. Hence I can download each year of the taxinyc dataset in > separate interactive windows and increase my bandwidth consumed. > >> > >> I would say that this is always a problem. In the datasets API the > >> goal is to maximize the resource usage within a single process. Now, > >> it may be a known or expected problem :) > >> > >> > Does the Dataset API make use of memory mapping? Do I have the > correct understanding that memory mapping is only intended for dealing with > large data stored on a local file system. Where as data stored on a cloud > file system in the feather format effectively cannot be memory mapped? > >> > >> I think the benefits of memory mapping are rather subtle and often > >> misleading. Datasets can make use of memory mapping for local > >> filesystems. Doing so will, at best, have a slight performance > >> benefit (avoiding a memcpy) but would most likely decrease performance > >> (by introducing I/O where it is not expected) and it will have no > >> effect whatsoever on the amount of RAM used. > >> > >> > This works as well as noted previosuly, so I assume the python > operators are mapped across similar to what happens when you use the > operators against a numpy or pandas series it just executes a np.multiply > or pd. multiply in the background. > >> > >> Yes. However the functions that get mapped can sometimes be > >> surprising. Specifically, logical operations map to the _kleene > >> variation and arithmetic maps to the _checked variation. You can find > >> the implementation at [1]. For multiplication this boils down to: > >> > >> ``` > >> @staticmethod > >> cdef Expression _expr_or_scalar(object expr): > >> if isinstance(expr, Expression): > >> return (<Expression> expr) > >> return (<Expression> Expression._scalar(expr)) > >> > >> ... > >> > >> def __mul__(Expression self, other): > >> other = Expression._expr_or_scalar(other) > >> return Expression._call("multiply_checked", [self, other]) > >> ``` > >> > >> > >> On Mon, Sep 19, 2022 at 12:52 AM Jacek Pliszka <[email protected]> > wrote: > >> > > >> > Re 2. In Python Azure SDK there is logic for partial blob read: > >> > > >> > > https://learn.microsoft.com/en-us/python/api/azure-storage-blob/azure.storage.blob.blobclient?view=azure-python#azure-storage-blob-blobclient-query-blob > >> > > >> > However I was unable to use it as it does not support parquet files > >> > with decimal columns and these are the ones I have. > >> > > >> > BR > >> > > >> > J > >> > > >> > pt., 16 wrz 2022 o 02:26 Aldrin <[email protected]> napisaĆ(a): > >> > > > >> > > For Question 2: > >> > > At a glance, I don't see anything in adlfs or azure that is able to > do partial reads of a blob. If you're using block blobs, then likely you > would want to store blocks of your file as separate blocks of a blob, and > then you can do partial data transfers that way. I could be > misunderstanding the SDKs or how Azure stores data, but my guess is that a > whole blob is retrieved and then the local file is able to support partial, > block-based reads as you expect from local filesystems. You may be able to > double check how much data is being retrieved by looking at where adlfs is > mounting your blob storage. > >> > > > >> > > For Question 3: > >> > > you can memory map remote files, it's just that every page fault > will be even more expensive than for local files. I am not sure how to tell > the dataset API to do memory mapping, and I'm not sure how well that would > work over adlfs. > >> > > > >> > > For Question 4: > >> > > Can you try using `pc.scalar(1000)` as shown in the first code > excerpt in [1]: > >> > > > >> > > >> x, y = pa.scalar(7.8), pa.scalar(9.3) > >> > > >> pc.multiply(x, y) > >> > > <pyarrow.DoubleScalar: 72.54> > >> > > > >> > > [1]: > https://arrow.apache.org/docs/python/compute.html#standard-compute-functions > >> > > > >> > > Aldrin Montana > >> > > Computer Science PhD Student > >> > > UC Santa Cruz > >> > > > >> > > > >> > > On Thu, Sep 8, 2022 at 8:26 PM Nikhil Makan < > [email protected]> wrote: > >> > >> > >> > >> Hi There, > >> > >> > >> > >> I have been experimenting with Tabular Datasets for data that can > be larger than memory and had a few questions related to what's going on > under the hood and how to work with it (I understand it is still > experimental). > >> > >> > >> > >> Question 1: Reading Data from Azure Blob Storage > >> > >> Now I know the filesystems don't fully support this yet, but there > is an fsspec compatible library (adlfs) which is shown in the file system > example which I have used. Example below with the nyc taxi dataset, where I > am pulling the whole dataset through and writing to disk to the feather > format. > >> > >> > >> > >> import adlfs > >> > >> import pyarrow.dataset as ds > >> > >> > >> > >> fs = adlfs.AzureBlobFileSystem(account_name='azureopendatastorage') > >> > >> > >> > >> dataset = ds.dataset('nyctlc/green/', filesystem=fs, > format='parquet') > >> > >> > >> > >> scanner = dataset.scanner() > >> > >> ds.write_dataset(scanner, f'taxinyc/green/feather/', > format='feather') > >> > >> > >> > >> This could be something on the Azure side but I find I am being > bottlenecked on the download speed and have noticed if I spin up multiple > Python sessions (or in my case interactive windows) I can increase my > throughput. Hence I can download each year of the taxinyc dataset in > separate interactive windows and increase my bandwidth consumed. The > tabular dataset documentation notes 'optionally parallel reading.' Do you > know how I can control this? Or perhaps control the number of concurrent > connections. Or has this got nothing to do with the arrow and sits purley > on the Azure side? I have increased the io thread count from the default 8 > to 16 and saw no difference, but could still spin up more interactive > windows to maximise bandwidth. > >> > >> > >> > >> Question 2: Reading Filtered Data from Azure Blob Storage > >> > >> Unfortunately I don't quite have a repeatable example here. > However using the same data above, only this time I have each year as a > feather file instead of a parquet file. I have uploaded this to my own > Azure blob storage account. > >> > >> I am trying to read a subset of this data from the blob storage by > selecting columns and filtering the data. The final result should be a > dataframe that takes up around 240 mb of memory (I have tested this by > working with the data locally). However when I run this by connecting to > the Azure blob storage it takes over an hour to run and it's clear it's > downloading a lot more data than I would have thought. Given the file > formats are feather that supports random access I would have thought I > would only have to download the 240 mb? > >> > >> > >> > >> Is there more going on in the background? Perhaps I am using this > incorrectly? > >> > >> > >> > >> import adlfs > >> > >> import pyarrow.dataset as ds > >> > >> > >> > >> connection_string = '' > >> > >> fs = > adlfs.AzureBlobFileSystem(connection_string=connection_string,) > >> > >> > >> > >> ds_f = ds.dataset("taxinyc/green/feather/", format='feather') > >> > >> > >> > >> df = ( > >> > >> ds_f > >> > >> .scanner( > >> > >> columns={ # Selections and Projections > >> > >> 'passengerCount': ds.field(('passengerCount'))*1000, > >> > >> 'tripDistance': ds.field(('tripDistance')) > >> > >> }, > >> > >> filter=(ds.field('vendorID') == 1) > >> > >> ) > >> > >> .to_table() > >> > >> .to_pandas() > >> > >> ) > >> > >> > >> > >> df.info() > >> > >> > >> > >> Question 3: How is memory mapping being applied? > >> > >> Does the Dataset API make use of memory mapping? Do I have the > correct understanding that memory mapping is only intended for dealing with > large data stored on a local file system. Where as data stored on a cloud > file system in the feather format effectively cannot be memory mapped? > >> > >> > >> > >> Question 4: Projections > >> > >> I noticed in the scanner function when projecting a column I am > unable to use any compute functions (I get a Type Error: only other > expressions allowed as arguments) yet I am able to multiply this using > standard python arithmetic. > >> > >> > >> > >> 'passengerCount': ds.field(('passengerCount'))*1000, > >> > >> > >> > >> 'passengerCount': pc.multiply(ds.field(('passengerCount')),1000), > >> > >> > >> > >> Is this correct or am I to process this using an iterator via > record batch to do this out of core? Is it actually even doing it out of > core with " *1000 ". > >> > >> > >> > >> Thanks for your help in advance. I have been following the Arrow > project for the last two years but have only recently decided to dive into > it in depth to explore it for various use cases. I am particularly > interested in the out-of-core data processing and the interaction with > cloud storages to retrieve only a selection of data from feather files. > Hopefully at some point when I have enough knowledge I can contribute to > this amazing project. > >> > >> > >> > >> Kind regards > >> > >> Nikhil Makan >
