Apologies for the late reply on this. I just wanted to say thank you Weston
for taking the time to provide that detailed explanation on memory
mapping!  (You should turn that into a blog post ha!)

Kind regards
Nikhil Makan

On Wed, Sep 21, 2022 at 5:21 PM Weston Pace <[email protected]> wrote:

> > The dataset API still makes use of multiple cores though correct?
>
> Yes.  It tries to use enough threads to ensure it is using the full
> processor.
>
> > How does this then relate to the filesystems interface and native
> support for HDFS, GCFS and S3. Do these exhibit the same issue?
>
> No, they should not.  I am not aware of the specifics of the Azure
> issue but I know we can handle concurrent reads on the native S3 and
> GCFS filesystems and I assume we can on HDFS but I have never set HDFS
> up myself.
>
> > Further to this are per my earlier discussions on this thread we are
> unable to do partial reads of a blob in Azure, I wanted to know if that is
> possible with any of the other three that have native support. i.e. can we
> filter the data downloaded from these instead of downloading everything and
> then filtering?
>
> There are two questions here:
>
> 1. Can we read part of a file from the filesystem?
>
> Yes, all filesystem implementations that I am aware of support this.
>
> 2. Can we reduce the amount of data read by using a filter?
>
> Parquet is the only format that has some form of this.  We can
> eliminate entire row groups from consideration if there is a pushdown
> filter and the row group statistics are informative enough.  All
> formats (including Azure) support some pushdown support through
> partitioning.  If the files are stored in a partitioned manner we can
> possible eliminate entire directories from consideration with a
> pushdown filter (e.g. if the filter is year==2004 then we can
> eliminate the directory `/year=2003/month=July`)
>
> There is more we could do here, both with parquet (page-level indices)
> and IPC (e.g. arrow/feather files which have no statistics at the
> moment).  We just need someone that has the time and energy to
> implement it.
>
> > I don't think I quite follow this. Happy to be pointed to some
> documentation to read more on this by the way.
>
> I have yet to find a good guide that explains this so I'll make a
> brief attempt at clarification.
>
> 1. If you read in a buffer of data from the disk, and then you discard
> that buffer (e.g. delete it), then that physical memory can be
> returned to the OS (barring fragmentation) regardless of how it was
> read in (e.g. memory mapping or regular file).
> 2. If you read in a buffer of data from the disk, and you do not
> discard that buffer, then that physical memory cannot be returned to
> the OS unless it is swapped out.  This is true regardless of how it
> was read in.
>
> A. Normal reads
>
> A normal read will first copy the buffer from the disk to the kernel
> page cache.  The caller controls exactly when this I/O occurs because
> it will be during the call to `read()`.  The kernel will then perform
> a memcpy from the kernel page cache to a user-space caller-provided
> buffer that the caller has allocated.
>
> ```
> // Caller provides the buffer
> void* my_buffer = malloc(100);
> // Read happens here, can control exactly when it happens
> read(fp, my_buffer, 100);
> ```
>
> B. Memory mapped reads
>
> A memory mapped read will first copy the buffer from the disk to the
> kernel page cache.  The caller doesn't have much control over when
> this I/O occurs because it will happen whenever the kernel decides it
> needs the page in the page cache to be populated (usually when the
> user first tries to access the data and a page fault occurs).  It will
> then give the user a pointer directly into the kernel page cache.
>
> // Buffer provided by the kernel, no read happens here
> void* my_buffer = mmap(...);
> // ...
> // ...
> // This line probably triggers a page fault and blocks while the data
> is read from the disk.
> int x = ((int*)my_buffer)[0];
>
> > I thought the basic idea behind memory mapping is that the data
> structure has the same representation on disk as it does in memory
> therefore allowing it to not consume additional memory when reading it
>
> No.  We cannot, for example, do arithmetic on data that is on the
> disk.  The CPU requires that data first be brought into RAM.
>
> > So would the dataset API process multiple files potentially quicker
> without memory mapping.
>
> Probably not.  If your data happened to already be fully in the kernel
> page cache then yes, the dataset API would probably process the file
> slightly faster.  However, this would typically only be true if your
> entire dataset (or at least the working set you are dealing with) is
> smaller than your physical RAM and has been accessed recently.  If the
> data is not already in the kernel page cache then memory mapping will
> probably have a negative effect.  This is because the page faults come
> at unexpected times and can block the process at times we don't expect
> it to.
>
> > Also correct me if I am wrong, but memory mapping is related to the ipc
> format only, formats such as parquet cannot take advantage of this?
>
> Any format can use memory mapped I/O.
>
> SO....why bother?
>
> Memory mapping is more typically used for IPC.  In particular, it can
> be used to perform true zero-copy IPC.  This IPC would only be true
> zero copy with the IPC format.
>
> Process A memory maps a file.
> Process A populates that region of memory with a table it generates in
> some way.
> Process A sends a control signal to process B that the table is ready.
> Process B memory maps the same file (we know it will be in the kernel
> page cache because we just used this RAM to write to it).
> Process B operates on the data in some way.
>
> On Tue, Sep 20, 2022 at 4:46 PM Nikhil Makan <[email protected]>
> wrote:
> >
> > Hi Weston, thanks for the response!
> >
> > > I would say that this is always a problem.  In the datasets API the
> > goal is to maximize the resource usage within a single process.  Now,
> > it may be a known or expected problem :)
> >
> > The dataset API still makes use of multiple cores though correct?
> > How does this then relate to the filesystems interface and native
> support for HDFS, GCFS and S3. Do these exhibit the same issue? Further to
> this are per my earlier discussions on this thread we are unable to do
> partial reads of a blob in Azure, I wanted to know if that is possible with
> any of the other three that have native support. i.e. can we filter the
> data downloaded from these instead of downloading everything and then
> filtering?
> >
> > > I think the benefits of memory mapping are rather subtle and often
> > misleading.  Datasets can make use of memory mapping for local
> > filesystems.  Doing so will, at best, have a slight performance
> > benefit (avoiding a memcpy) but would most likely decrease performance
> > (by introducing I/O where it is not expected) and it will have no
> > effect whatsoever on the amount of RAM used.
> >
> > I don't think I quite follow this. Happy to be pointed to some
> documentation to read more on this by the way. I thought the basic idea
> behind memory mapping is that the data structure has the same
> representation on disk as it does in memory therefore allowing it to not
> consume additional memory when reading it, which is typical with normal I/O
> operations with reading files. So would the dataset API process multiple
> files potentially quicker without memory mapping. Also correct me if I am
> wrong, but memory mapping is related to the ipc format only, formats such
> as parquet cannot take advantage of this?
> >
> > Kind regards
> > Nikhil Makan
> >
> >
> > On Tue, Sep 20, 2022 at 5:12 AM Weston Pace <[email protected]>
> wrote:
> >>
> >> Sorry for the slow reply.
> >>
> >> > This could be something on the Azure side but I find I am being
> bottlenecked on the download speed and have noticed if I spin up multiple
> Python sessions (or in my case interactive windows) I can increase my
> throughput. Hence I can download each year of the taxinyc dataset in
> separate interactive windows and increase my bandwidth consumed.
> >>
> >> I would say that this is always a problem.  In the datasets API the
> >> goal is to maximize the resource usage within a single process.  Now,
> >> it may be a known or expected problem :)
> >>
> >> > Does the Dataset API make use of memory mapping? Do I have the
> correct understanding that memory mapping is only intended for dealing with
> large data stored on a local file system. Where as data stored on a cloud
> file system in the feather format effectively cannot be memory mapped?
> >>
> >> I think the benefits of memory mapping are rather subtle and often
> >> misleading.  Datasets can make use of memory mapping for local
> >> filesystems.  Doing so will, at best, have a slight performance
> >> benefit (avoiding a memcpy) but would most likely decrease performance
> >> (by introducing I/O where it is not expected) and it will have no
> >> effect whatsoever on the amount of RAM used.
> >>
> >> > This works as well as noted previosuly, so I assume the python
> operators are mapped across similar to what happens when you use the
> operators against a numpy or pandas series it just executes a np.multiply
> or pd. multiply in the background.
> >>
> >> Yes.  However the functions that get mapped can sometimes be
> >> surprising.  Specifically, logical operations map to the _kleene
> >> variation and arithmetic maps to the _checked variation.  You can find
> >> the implementation at [1].  For multiplication this boils down to:
> >>
> >> ```
> >> @staticmethod
> >> cdef Expression _expr_or_scalar(object expr):
> >>     if isinstance(expr, Expression):
> >>         return (<Expression> expr)
> >>     return (<Expression> Expression._scalar(expr))
> >>
> >> ...
> >>
> >> def __mul__(Expression self, other):
> >>     other = Expression._expr_or_scalar(other)
> >>     return Expression._call("multiply_checked", [self, other])
> >> ```
> >>
> >>
> >> On Mon, Sep 19, 2022 at 12:52 AM Jacek Pliszka <[email protected]>
> wrote:
> >> >
> >> > Re 2.   In Python Azure SDK there is logic for partial blob read:
> >> >
> >> >
> https://learn.microsoft.com/en-us/python/api/azure-storage-blob/azure.storage.blob.blobclient?view=azure-python#azure-storage-blob-blobclient-query-blob
> >> >
> >> > However I was unable to use it as it does not support parquet files
> >> > with decimal columns and these are the ones I have.
> >> >
> >> > BR
> >> >
> >> > J
> >> >
> >> > pt., 16 wrz 2022 o 02:26 Aldrin <[email protected]> napisaƂ(a):
> >> > >
> >> > > For Question 2:
> >> > > At a glance, I don't see anything in adlfs or azure that is able to
> do partial reads of a blob. If you're using block blobs, then likely you
> would want to store blocks of your file as separate blocks of a blob, and
> then you can do partial data transfers that way. I could be
> misunderstanding the SDKs or how Azure stores data, but my guess is that a
> whole blob is retrieved and then the local file is able to support partial,
> block-based reads as you expect from local filesystems. You may be able to
> double check how much data is being retrieved by looking at where adlfs is
> mounting your blob storage.
> >> > >
> >> > > For Question 3:
> >> > > you can memory map remote files, it's just that every page fault
> will be even more expensive than for local files. I am not sure how to tell
> the dataset API to do memory mapping, and I'm not sure how well that would
> work over adlfs.
> >> > >
> >> > > For Question 4:
> >> > > Can you try using `pc.scalar(1000)` as shown in the first code
> excerpt in [1]:
> >> > >
> >> > > >> x, y = pa.scalar(7.8), pa.scalar(9.3)
> >> > > >> pc.multiply(x, y)
> >> > > <pyarrow.DoubleScalar: 72.54>
> >> > >
> >> > > [1]:
> https://arrow.apache.org/docs/python/compute.html#standard-compute-functions
> >> > >
> >> > > Aldrin Montana
> >> > > Computer Science PhD Student
> >> > > UC Santa Cruz
> >> > >
> >> > >
> >> > > On Thu, Sep 8, 2022 at 8:26 PM Nikhil Makan <
> [email protected]> wrote:
> >> > >>
> >> > >> Hi There,
> >> > >>
> >> > >> I have been experimenting with Tabular Datasets for data that can
> be larger than memory and had a few questions related to what's going on
> under the hood and how to work with it (I understand it is still
> experimental).
> >> > >>
> >> > >> Question 1: Reading Data from Azure Blob Storage
> >> > >> Now I know the filesystems don't fully support this yet, but there
> is an fsspec compatible library (adlfs) which is shown in the file system
> example which I have used. Example below with the nyc taxi dataset, where I
> am pulling the whole dataset through and writing to disk to the feather
> format.
> >> > >>
> >> > >> import adlfs
> >> > >> import pyarrow.dataset as ds
> >> > >>
> >> > >> fs = adlfs.AzureBlobFileSystem(account_name='azureopendatastorage')
> >> > >>
> >> > >> dataset = ds.dataset('nyctlc/green/', filesystem=fs,
> format='parquet')
> >> > >>
> >> > >> scanner = dataset.scanner()
> >> > >> ds.write_dataset(scanner, f'taxinyc/green/feather/',
> format='feather')
> >> > >>
> >> > >> This could be something on the Azure side but I find I am being
> bottlenecked on the download speed and have noticed if I spin up multiple
> Python sessions (or in my case interactive windows) I can increase my
> throughput. Hence I can download each year of the taxinyc dataset in
> separate interactive windows and increase my bandwidth consumed. The
> tabular dataset documentation notes 'optionally parallel reading.' Do you
> know how I can control this? Or perhaps control the number of concurrent
> connections. Or has this got nothing to do with the arrow and sits purley
> on the Azure side? I have increased the io thread count from the default 8
> to 16 and saw no difference, but could still spin up more interactive
> windows to maximise bandwidth.
> >> > >>
> >> > >> Question 2: Reading Filtered Data from Azure Blob Storage
> >> > >> Unfortunately I don't quite have a repeatable example here.
> However using the same data above, only this time I have each year as a
> feather file instead of a parquet file. I have uploaded this to my own
> Azure blob storage account.
> >> > >> I am trying to read a subset of this data from the blob storage by
> selecting columns and filtering the data. The final result should be a
> dataframe that takes up around 240 mb of memory (I have tested this by
> working with the data locally). However when I run this by connecting to
> the Azure blob storage it takes over an hour to run and it's clear it's
> downloading a lot more data than I would have thought. Given the file
> formats are feather that supports random access I would have thought I
> would only have to download the 240 mb?
> >> > >>
> >> > >> Is there more going on in the background? Perhaps I am using this
> incorrectly?
> >> > >>
> >> > >> import adlfs
> >> > >> import pyarrow.dataset as ds
> >> > >>
> >> > >> connection_string = ''
> >> > >> fs =
> adlfs.AzureBlobFileSystem(connection_string=connection_string,)
> >> > >>
> >> > >> ds_f = ds.dataset("taxinyc/green/feather/", format='feather')
> >> > >>
> >> > >> df = (
> >> > >>     ds_f
> >> > >>     .scanner(
> >> > >>         columns={ # Selections and Projections
> >> > >>             'passengerCount': ds.field(('passengerCount'))*1000,
> >> > >>             'tripDistance': ds.field(('tripDistance'))
> >> > >>         },
> >> > >>         filter=(ds.field('vendorID') == 1)
> >> > >>     )
> >> > >>     .to_table()
> >> > >>     .to_pandas()
> >> > >> )
> >> > >>
> >> > >> df.info()
> >> > >>
> >> > >> Question 3: How is memory mapping being applied?
> >> > >> Does the Dataset API make use of memory mapping? Do I have the
> correct understanding that memory mapping is only intended for dealing with
> large data stored on a local file system. Where as data stored on a cloud
> file system in the feather format effectively cannot be memory mapped?
> >> > >>
> >> > >> Question 4: Projections
> >> > >> I noticed in the scanner function when projecting a column I am
> unable to use any compute functions (I get a Type Error: only other
> expressions allowed as arguments) yet I am able to multiply this using
> standard python arithmetic.
> >> > >>
> >> > >> 'passengerCount': ds.field(('passengerCount'))*1000,
> >> > >>
> >> > >> 'passengerCount': pc.multiply(ds.field(('passengerCount')),1000),
> >> > >>
> >> > >> Is this correct or am I to process this using an iterator via
> record batch to do this out of core? Is it actually even doing it out of
> core with " *1000 ".
> >> > >>
> >> > >> Thanks for your help in advance. I have been following the Arrow
> project for the last two years but have only recently decided to dive into
> it in depth to explore it for various use cases. I am particularly
> interested in the out-of-core data processing and the interaction with
> cloud storages to retrieve only a selection of data from feather files.
> Hopefully at some point when I have enough knowledge I can contribute to
> this amazing project.
> >> > >>
> >> > >> Kind regards
> >> > >> Nikhil Makan
>

Reply via email to