Yes, it already supports concurrent io when downloading or uploading data.
You can configure it with an initial threshold over which it will switch to
concurrent download/upload, and there are also options to configure how big
each "part" is and how many concurrent tasks to allow at a single time.

For downloads, we rely on doing an initial HEAD request on the object to
get the total size (via Content-Length header), then we do byte Range
requests according to user-provided concurrency options.

For uploads, we use the cloud-specific multipart upload APIs; so you
upload/commit each part, then at the end you commit all the parts in a
final step.

-Jacob

On Sun, Oct 9, 2022 at 6:28 PM Nikhil Makan <[email protected]> wrote:

> Thanks Jacob for the comments. Appreciate it.
>
> Out of interest does the cloud storage interface you are working on for
> Julia support concurrent io in order to improve performance with respect to
> download/uploading data. I know for pyarrow for a blob storage we have to
> use a fsspec compliant library which is adlfs. However concurrent io is
> still in the works with that library.
>
> Kind regards
> Nikhil Makan
>
> On Wed, Oct 5, 2022 at 7:49 PM Jacob Quinn <[email protected]> wrote:
>
>> Sorry for the late reply, but thought I'd chime in with a thought or two,
>> as I've had the chance to work on both the Arrow.jl Julia implementation as
>> well as recently working on a consistent cloud storage interface (for S3,
>> Azure, and planned GCP, also for Julia;
>> https://github.com/JuliaServices/CloudStore.jl).
>>
>> First, to clarify, all cloud storage providers support "partial" reads in
>> the form of providing support for "ranged" HTTP requests (i.e. with a
>> "Range" header like: "Range: bytes 0-9"). So that means for a single
>> "object" in a cloud store, you could request specific byte ranges of that
>> single object to be returned.
>>
>> How would that interact with stored data files? Well, for Arrow
>> IPC/Feather format, you could potentially do a series of these "range"
>> requests to read a single Feather file flatbuffer metadata, which contains
>> the specific byte offsets of columns within the data file. So in theory, it
>> should be fairly straightforward to apply
>> a kind of "column selection" operation where only specific columns are
>> actually downloaded from the cloud store, and it could be avoided to
>> download the entire file.
>>
>> For other data formats? It's generally not as applicable since we don't
>> have this kind specific byte information of where certain rows/columns live
>> within a single object.
>>
>> On the other hand, the parquet format supports a partitioning scheme that
>> *IS* more amenable to "partial reads", but in a slightly different way.
>> Instead of using HTTP Range requests, specific columns or row batches of
>> columns are stored as separate *objects* in the cloud store. And so by
>> doing a "list" type of operation
>> on all "objects" in the store, and reading overall metadata of the
>> parquet data, we could similarly do a "column selection" kind of operation
>> by only downloading specific *objects* from the cloud store that correspond
>> to the desired columns.
>>
>> Hopefully that provides a little bit of clarity?
>>
>> This is somewhat the overall vision that we're working towards with the
>> Julia implementation to hopefully provide really efficient interop with
>> cloud-stored data.
>>
>> -Jacob Quinn
>>
>>
>> On Sun, Sep 18, 2022 at 7:47 PM Nikhil Makan <[email protected]>
>> wrote:
>>
>>> Thanks Aldrin for the response on this.
>>>
>>> Question 1:
>>> For reference to anyone else who reads this, it appears adlfs does not
>>> support concurrent io and this is currently being developed.
>>> https://github.com/fsspec/adlfs/issues/268
>>>
>>> Question 2:
>>> Noted your points. I am using block blobs. If I am understanding you
>>> correctly are you suggesting just splitting the data up into separate
>>> blobs? This way if I filter the data it only downloads the blobs that are
>>> required? This would seem to only work if you know beforehand what the
>>> filter could be so you can split your data accordingly. However, if you
>>> wanted to return two columns of all the data I assume this would still
>>> result in all the blobs being downloaded. I also came across this
>>> https://learn.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-query-acceleration-how-to?tabs=python.
>>> However this is only for csv/json files.
>>>
>>> Are you aware of partial reading where we take advantage of the columnar
>>> format such as arrow/parquet being implemented in any other storages suchs
>>> as Google Cloud Storage or Amazon S3? I know PyArrow has native support for
>>> GCS and S3 however I ran this test example against S3 and no real
>>> improvements. Seems to be the same issue where the whole file is downloaded.
>>>
>>> import pyarrow.dataset as ds
>>> ds_f = ds.dataset(
>>> "s3://voltrondata-labs-datasets/nyc-taxi/year=2019/month=1")
>>> ds_f.head(10)
>>>
>>> df = (
>>>     ds_f
>>>     .scanner(
>>>         columns={ # Selections and Projections
>>>             'passengerCount': ds.field(('passengerCount')),
>>>         },
>>>     )
>>>     .to_table()
>>>     .to_pandas()
>>> )
>>> df.info()
>>>
>>> Question 3;
>>> Thanks, Noted.
>>>
>>> Question 4:
>>> Tried this:
>>> 'passengerCount': pc.multiply(ds.field(('passengerCount')),pa.scalar(
>>> 1000))
>>> Same issue -> Type Error: only other expressions allowed as arguments
>>>
>>> However it works with this:
>>> 'passengerCount': pc.multiply(ds.field(('passengerCount')),pc.scalar(
>>> 1000))
>>>
>>> This works as well as noted previosuly, so I assume the python operators
>>> are mapped across similar to what happens when you use the operators
>>> against a numpy or pandas series it just executes a np.multiply or pd.
>>> multiply in the background.
>>> 'passengerCount': ds.field(('passengerCount'))*1000,
>>>
>>> Kind regards
>>> Nikhil Makan
>>>
>>> On Fri, Sep 16, 2022 at 12:28 PM Aldrin <[email protected]> wrote:
>>>
>>>> (oh, sorry I misread `pa.scalar` as `pc.scalar`, so please try
>>>> `pyarrow.scalar` per the documentation)
>>>>
>>>> Aldrin Montana
>>>> Computer Science PhD Student
>>>> UC Santa Cruz
>>>>
>>>>
>>>> On Thu, Sep 15, 2022 at 5:26 PM Aldrin <[email protected]> wrote:
>>>>
>>>>> For Question 2:
>>>>> At a glance, I don't see anything in adlfs or azure that is able to do
>>>>> partial reads of a blob. If you're using block blobs, then likely you 
>>>>> would
>>>>> want to store blocks of your file as separate blocks of a blob, and then
>>>>> you can do partial data transfers that way. I could be misunderstanding 
>>>>> the
>>>>> SDKs or how Azure stores data, but my guess is that a whole blob is
>>>>> retrieved and then the local file is able to support partial, block-based
>>>>> reads as you expect from local filesystems. You may be able to double 
>>>>> check
>>>>> how much data is being retrieved by looking at where adlfs is mounting 
>>>>> your
>>>>> blob storage.
>>>>>
>>>>> For Question 3:
>>>>> you can memory map remote files, it's just that every page fault will
>>>>> be even more expensive than for local files. I am not sure how to tell the
>>>>> dataset API to do memory mapping, and I'm not sure how well that would 
>>>>> work
>>>>> over adlfs.
>>>>>
>>>>> For Question 4:
>>>>> Can you try using `pc.scalar(1000)` as shown in the first code excerpt
>>>>> in [1]:
>>>>>
>>>>> >> x, y = pa.scalar(7.8), pa.scalar(9.3)
>>>>> >> pc.multiply(x, y)
>>>>> <pyarrow.DoubleScalar: 72.54>
>>>>>
>>>>> [1]:
>>>>> https://arrow.apache.org/docs/python/compute.html#standard-compute-functions
>>>>>
>>>>> Aldrin Montana
>>>>> Computer Science PhD Student
>>>>> UC Santa Cruz
>>>>>
>>>>>
>>>>> On Thu, Sep 8, 2022 at 8:26 PM Nikhil Makan <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi There,
>>>>>>
>>>>>> I have been experimenting with Tabular Datasets
>>>>>> <https://arrow.apache.org/docs/python/dataset.html> for data that
>>>>>> can be larger than memory and had a few questions related to what's going
>>>>>> on under the hood and how to work with it (I understand it is still
>>>>>> experimental).
>>>>>>
>>>>>> *Question 1: Reading Data from Azure Blob Storage*
>>>>>> Now I know the filesystems don't fully support this yet, but there is
>>>>>> an fsspec compatible library (adlfs) which is shown in the file
>>>>>> system example
>>>>>> <https://arrow.apache.org/docs/python/filesystems.html#using-fsspec-compatible-filesystems-with-arrow>
>>>>>>  which
>>>>>> I have used. Example below with the nyc taxi dataset, where I am pulling
>>>>>> the whole dataset through and writing to disk to the feather format.
>>>>>>
>>>>>> import adlfs
>>>>>> import pyarrow.dataset as ds
>>>>>>
>>>>>> fs = adlfs.AzureBlobFileSystem(account_name='azureopendatastorage')
>>>>>>
>>>>>> dataset = ds.dataset('nyctlc/green/', filesystem=fs, format='parquet'
>>>>>> )
>>>>>>
>>>>>> scanner = dataset.scanner()
>>>>>> ds.write_dataset(scanner, f'taxinyc/green/feather/', format='feather'
>>>>>> )
>>>>>>
>>>>>> This could be something on the Azure side but I find I am being
>>>>>> bottlenecked on the download speed and have noticed if I spin up multiple
>>>>>> Python sessions (or in my case interactive windows) I can increase my
>>>>>> throughput. Hence I can download each year of the taxinyc dataset in
>>>>>> separate interactive windows and increase my bandwidth consumed. The 
>>>>>> tabular
>>>>>> dataset <https://arrow.apache.org/docs/python/dataset.html> documentation
>>>>>> notes 'optionally parallel reading.' Do you know how I can control this? 
>>>>>> Or
>>>>>> perhaps control the number of concurrent connections. Or has this got
>>>>>> nothing to do with the arrow and sits purley on the Azure side? I have
>>>>>> increased the io thread count from the default 8 to 16 and saw no
>>>>>> difference, but could still spin up more interactive windows to maximise
>>>>>> bandwidth.
>>>>>>
>>>>>> *Question 2: Reading Filtered Data from Azure Blob Storage*
>>>>>> Unfortunately I don't quite have a repeatable example here. However
>>>>>> using the same data above, only this time I have each year as a feather
>>>>>> file instead of a parquet file. I have uploaded this to my own Azure blob
>>>>>> storage account.
>>>>>> I am trying to read a subset of this data from the blob storage by
>>>>>> selecting columns and filtering the data. The final result should be a
>>>>>> dataframe that takes up around 240 mb of memory (I have tested this by
>>>>>> working with the data locally). However when I run this by connecting to
>>>>>> the Azure blob storage it takes over an hour to run and it's clear it's
>>>>>> downloading a lot more data than I would have thought. Given the file
>>>>>> formats are feather that supports random access I would have thought I
>>>>>> would only have to download the 240 mb?
>>>>>>
>>>>>> Is there more going on in the background? Perhaps I am using this
>>>>>> incorrectly?
>>>>>>
>>>>>> import adlfs
>>>>>> import pyarrow.dataset as ds
>>>>>>
>>>>>> connection_string = ''
>>>>>> fs = adlfs.AzureBlobFileSystem(connection_string=connection_string,)
>>>>>>
>>>>>> ds_f = ds.dataset("taxinyc/green/feather/", format='feather')
>>>>>>
>>>>>> df = (
>>>>>>     ds_f
>>>>>>     .scanner(
>>>>>>         columns={ # Selections and Projections
>>>>>>             'passengerCount': ds.field(('passengerCount'))*1000,
>>>>>>             'tripDistance': ds.field(('tripDistance'))
>>>>>>         },
>>>>>>         filter=(ds.field('vendorID') == 1)
>>>>>>     )
>>>>>>     .to_table()
>>>>>>     .to_pandas()
>>>>>> )
>>>>>>
>>>>>> df.info()
>>>>>>
>>>>>> *Question 3: How is memory mapping being applied?*
>>>>>> Does the Dataset API make use of memory mapping? Do I have the
>>>>>> correct understanding that memory mapping is only intended for dealing 
>>>>>> with
>>>>>> large data stored on a local file system. Where as data stored on a cloud
>>>>>> file system in the feather format effectively cannot be memory mapped?
>>>>>>
>>>>>> *Question 4: Projections*
>>>>>> I noticed in the scanner function when projecting a column I am
>>>>>> unable to use any compute functions (I get a Type Error: only other
>>>>>> expressions allowed as arguments) yet I am able to multiply this using
>>>>>> standard python arithmetic.
>>>>>>
>>>>>> 'passengerCount': ds.field(('passengerCount'))*1000,
>>>>>>
>>>>>> 'passengerCount': pc.multiply(ds.field(('passengerCount')),1000),
>>>>>>
>>>>>> Is this correct or am I to process this using an iterator via record
>>>>>> batch
>>>>>> <https://arrow.apache.org/docs/python/dataset.html#iterative-out-of-core-or-streaming-reads>
>>>>>>  to
>>>>>> do this out of core? Is it actually even doing it out of core with " 
>>>>>> *1000
>>>>>> ".
>>>>>>
>>>>>> Thanks for your help in advance. I have been following the Arrow
>>>>>> project for the last two years but have only recently decided to dive 
>>>>>> into
>>>>>> it in depth to explore it for various use cases. I am
>>>>>> particularly interested in the out-of-core data processing and the
>>>>>> interaction with cloud storages to retrieve only a selection of data from
>>>>>> feather files. Hopefully at some point when I have enough knowledge I can
>>>>>> contribute to this amazing project.
>>>>>>
>>>>>> Kind regards
>>>>>> Nikhil Makan
>>>>>>
>>>>>

Reply via email to