Yes, it already supports concurrent io when downloading or uploading data. You can configure it with an initial threshold over which it will switch to concurrent download/upload, and there are also options to configure how big each "part" is and how many concurrent tasks to allow at a single time.
For downloads, we rely on doing an initial HEAD request on the object to get the total size (via Content-Length header), then we do byte Range requests according to user-provided concurrency options. For uploads, we use the cloud-specific multipart upload APIs; so you upload/commit each part, then at the end you commit all the parts in a final step. -Jacob On Sun, Oct 9, 2022 at 6:28 PM Nikhil Makan <[email protected]> wrote: > Thanks Jacob for the comments. Appreciate it. > > Out of interest does the cloud storage interface you are working on for > Julia support concurrent io in order to improve performance with respect to > download/uploading data. I know for pyarrow for a blob storage we have to > use a fsspec compliant library which is adlfs. However concurrent io is > still in the works with that library. > > Kind regards > Nikhil Makan > > On Wed, Oct 5, 2022 at 7:49 PM Jacob Quinn <[email protected]> wrote: > >> Sorry for the late reply, but thought I'd chime in with a thought or two, >> as I've had the chance to work on both the Arrow.jl Julia implementation as >> well as recently working on a consistent cloud storage interface (for S3, >> Azure, and planned GCP, also for Julia; >> https://github.com/JuliaServices/CloudStore.jl). >> >> First, to clarify, all cloud storage providers support "partial" reads in >> the form of providing support for "ranged" HTTP requests (i.e. with a >> "Range" header like: "Range: bytes 0-9"). So that means for a single >> "object" in a cloud store, you could request specific byte ranges of that >> single object to be returned. >> >> How would that interact with stored data files? Well, for Arrow >> IPC/Feather format, you could potentially do a series of these "range" >> requests to read a single Feather file flatbuffer metadata, which contains >> the specific byte offsets of columns within the data file. So in theory, it >> should be fairly straightforward to apply >> a kind of "column selection" operation where only specific columns are >> actually downloaded from the cloud store, and it could be avoided to >> download the entire file. >> >> For other data formats? It's generally not as applicable since we don't >> have this kind specific byte information of where certain rows/columns live >> within a single object. >> >> On the other hand, the parquet format supports a partitioning scheme that >> *IS* more amenable to "partial reads", but in a slightly different way. >> Instead of using HTTP Range requests, specific columns or row batches of >> columns are stored as separate *objects* in the cloud store. And so by >> doing a "list" type of operation >> on all "objects" in the store, and reading overall metadata of the >> parquet data, we could similarly do a "column selection" kind of operation >> by only downloading specific *objects* from the cloud store that correspond >> to the desired columns. >> >> Hopefully that provides a little bit of clarity? >> >> This is somewhat the overall vision that we're working towards with the >> Julia implementation to hopefully provide really efficient interop with >> cloud-stored data. >> >> -Jacob Quinn >> >> >> On Sun, Sep 18, 2022 at 7:47 PM Nikhil Makan <[email protected]> >> wrote: >> >>> Thanks Aldrin for the response on this. >>> >>> Question 1: >>> For reference to anyone else who reads this, it appears adlfs does not >>> support concurrent io and this is currently being developed. >>> https://github.com/fsspec/adlfs/issues/268 >>> >>> Question 2: >>> Noted your points. I am using block blobs. If I am understanding you >>> correctly are you suggesting just splitting the data up into separate >>> blobs? This way if I filter the data it only downloads the blobs that are >>> required? This would seem to only work if you know beforehand what the >>> filter could be so you can split your data accordingly. However, if you >>> wanted to return two columns of all the data I assume this would still >>> result in all the blobs being downloaded. I also came across this >>> https://learn.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-query-acceleration-how-to?tabs=python. >>> However this is only for csv/json files. >>> >>> Are you aware of partial reading where we take advantage of the columnar >>> format such as arrow/parquet being implemented in any other storages suchs >>> as Google Cloud Storage or Amazon S3? I know PyArrow has native support for >>> GCS and S3 however I ran this test example against S3 and no real >>> improvements. Seems to be the same issue where the whole file is downloaded. >>> >>> import pyarrow.dataset as ds >>> ds_f = ds.dataset( >>> "s3://voltrondata-labs-datasets/nyc-taxi/year=2019/month=1") >>> ds_f.head(10) >>> >>> df = ( >>> ds_f >>> .scanner( >>> columns={ # Selections and Projections >>> 'passengerCount': ds.field(('passengerCount')), >>> }, >>> ) >>> .to_table() >>> .to_pandas() >>> ) >>> df.info() >>> >>> Question 3; >>> Thanks, Noted. >>> >>> Question 4: >>> Tried this: >>> 'passengerCount': pc.multiply(ds.field(('passengerCount')),pa.scalar( >>> 1000)) >>> Same issue -> Type Error: only other expressions allowed as arguments >>> >>> However it works with this: >>> 'passengerCount': pc.multiply(ds.field(('passengerCount')),pc.scalar( >>> 1000)) >>> >>> This works as well as noted previosuly, so I assume the python operators >>> are mapped across similar to what happens when you use the operators >>> against a numpy or pandas series it just executes a np.multiply or pd. >>> multiply in the background. >>> 'passengerCount': ds.field(('passengerCount'))*1000, >>> >>> Kind regards >>> Nikhil Makan >>> >>> On Fri, Sep 16, 2022 at 12:28 PM Aldrin <[email protected]> wrote: >>> >>>> (oh, sorry I misread `pa.scalar` as `pc.scalar`, so please try >>>> `pyarrow.scalar` per the documentation) >>>> >>>> Aldrin Montana >>>> Computer Science PhD Student >>>> UC Santa Cruz >>>> >>>> >>>> On Thu, Sep 15, 2022 at 5:26 PM Aldrin <[email protected]> wrote: >>>> >>>>> For Question 2: >>>>> At a glance, I don't see anything in adlfs or azure that is able to do >>>>> partial reads of a blob. If you're using block blobs, then likely you >>>>> would >>>>> want to store blocks of your file as separate blocks of a blob, and then >>>>> you can do partial data transfers that way. I could be misunderstanding >>>>> the >>>>> SDKs or how Azure stores data, but my guess is that a whole blob is >>>>> retrieved and then the local file is able to support partial, block-based >>>>> reads as you expect from local filesystems. You may be able to double >>>>> check >>>>> how much data is being retrieved by looking at where adlfs is mounting >>>>> your >>>>> blob storage. >>>>> >>>>> For Question 3: >>>>> you can memory map remote files, it's just that every page fault will >>>>> be even more expensive than for local files. I am not sure how to tell the >>>>> dataset API to do memory mapping, and I'm not sure how well that would >>>>> work >>>>> over adlfs. >>>>> >>>>> For Question 4: >>>>> Can you try using `pc.scalar(1000)` as shown in the first code excerpt >>>>> in [1]: >>>>> >>>>> >> x, y = pa.scalar(7.8), pa.scalar(9.3) >>>>> >> pc.multiply(x, y) >>>>> <pyarrow.DoubleScalar: 72.54> >>>>> >>>>> [1]: >>>>> https://arrow.apache.org/docs/python/compute.html#standard-compute-functions >>>>> >>>>> Aldrin Montana >>>>> Computer Science PhD Student >>>>> UC Santa Cruz >>>>> >>>>> >>>>> On Thu, Sep 8, 2022 at 8:26 PM Nikhil Makan <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi There, >>>>>> >>>>>> I have been experimenting with Tabular Datasets >>>>>> <https://arrow.apache.org/docs/python/dataset.html> for data that >>>>>> can be larger than memory and had a few questions related to what's going >>>>>> on under the hood and how to work with it (I understand it is still >>>>>> experimental). >>>>>> >>>>>> *Question 1: Reading Data from Azure Blob Storage* >>>>>> Now I know the filesystems don't fully support this yet, but there is >>>>>> an fsspec compatible library (adlfs) which is shown in the file >>>>>> system example >>>>>> <https://arrow.apache.org/docs/python/filesystems.html#using-fsspec-compatible-filesystems-with-arrow> >>>>>> which >>>>>> I have used. Example below with the nyc taxi dataset, where I am pulling >>>>>> the whole dataset through and writing to disk to the feather format. >>>>>> >>>>>> import adlfs >>>>>> import pyarrow.dataset as ds >>>>>> >>>>>> fs = adlfs.AzureBlobFileSystem(account_name='azureopendatastorage') >>>>>> >>>>>> dataset = ds.dataset('nyctlc/green/', filesystem=fs, format='parquet' >>>>>> ) >>>>>> >>>>>> scanner = dataset.scanner() >>>>>> ds.write_dataset(scanner, f'taxinyc/green/feather/', format='feather' >>>>>> ) >>>>>> >>>>>> This could be something on the Azure side but I find I am being >>>>>> bottlenecked on the download speed and have noticed if I spin up multiple >>>>>> Python sessions (or in my case interactive windows) I can increase my >>>>>> throughput. Hence I can download each year of the taxinyc dataset in >>>>>> separate interactive windows and increase my bandwidth consumed. The >>>>>> tabular >>>>>> dataset <https://arrow.apache.org/docs/python/dataset.html> documentation >>>>>> notes 'optionally parallel reading.' Do you know how I can control this? >>>>>> Or >>>>>> perhaps control the number of concurrent connections. Or has this got >>>>>> nothing to do with the arrow and sits purley on the Azure side? I have >>>>>> increased the io thread count from the default 8 to 16 and saw no >>>>>> difference, but could still spin up more interactive windows to maximise >>>>>> bandwidth. >>>>>> >>>>>> *Question 2: Reading Filtered Data from Azure Blob Storage* >>>>>> Unfortunately I don't quite have a repeatable example here. However >>>>>> using the same data above, only this time I have each year as a feather >>>>>> file instead of a parquet file. I have uploaded this to my own Azure blob >>>>>> storage account. >>>>>> I am trying to read a subset of this data from the blob storage by >>>>>> selecting columns and filtering the data. The final result should be a >>>>>> dataframe that takes up around 240 mb of memory (I have tested this by >>>>>> working with the data locally). However when I run this by connecting to >>>>>> the Azure blob storage it takes over an hour to run and it's clear it's >>>>>> downloading a lot more data than I would have thought. Given the file >>>>>> formats are feather that supports random access I would have thought I >>>>>> would only have to download the 240 mb? >>>>>> >>>>>> Is there more going on in the background? Perhaps I am using this >>>>>> incorrectly? >>>>>> >>>>>> import adlfs >>>>>> import pyarrow.dataset as ds >>>>>> >>>>>> connection_string = '' >>>>>> fs = adlfs.AzureBlobFileSystem(connection_string=connection_string,) >>>>>> >>>>>> ds_f = ds.dataset("taxinyc/green/feather/", format='feather') >>>>>> >>>>>> df = ( >>>>>> ds_f >>>>>> .scanner( >>>>>> columns={ # Selections and Projections >>>>>> 'passengerCount': ds.field(('passengerCount'))*1000, >>>>>> 'tripDistance': ds.field(('tripDistance')) >>>>>> }, >>>>>> filter=(ds.field('vendorID') == 1) >>>>>> ) >>>>>> .to_table() >>>>>> .to_pandas() >>>>>> ) >>>>>> >>>>>> df.info() >>>>>> >>>>>> *Question 3: How is memory mapping being applied?* >>>>>> Does the Dataset API make use of memory mapping? Do I have the >>>>>> correct understanding that memory mapping is only intended for dealing >>>>>> with >>>>>> large data stored on a local file system. Where as data stored on a cloud >>>>>> file system in the feather format effectively cannot be memory mapped? >>>>>> >>>>>> *Question 4: Projections* >>>>>> I noticed in the scanner function when projecting a column I am >>>>>> unable to use any compute functions (I get a Type Error: only other >>>>>> expressions allowed as arguments) yet I am able to multiply this using >>>>>> standard python arithmetic. >>>>>> >>>>>> 'passengerCount': ds.field(('passengerCount'))*1000, >>>>>> >>>>>> 'passengerCount': pc.multiply(ds.field(('passengerCount')),1000), >>>>>> >>>>>> Is this correct or am I to process this using an iterator via record >>>>>> batch >>>>>> <https://arrow.apache.org/docs/python/dataset.html#iterative-out-of-core-or-streaming-reads> >>>>>> to >>>>>> do this out of core? Is it actually even doing it out of core with " >>>>>> *1000 >>>>>> ". >>>>>> >>>>>> Thanks for your help in advance. I have been following the Arrow >>>>>> project for the last two years but have only recently decided to dive >>>>>> into >>>>>> it in depth to explore it for various use cases. I am >>>>>> particularly interested in the out-of-core data processing and the >>>>>> interaction with cloud storages to retrieve only a selection of data from >>>>>> feather files. Hopefully at some point when I have enough knowledge I can >>>>>> contribute to this amazing project. >>>>>> >>>>>> Kind regards >>>>>> Nikhil Makan >>>>>> >>>>>
