pon., 4 gru 2023 o 14:41 Luca Maurelli <[email protected]> napisał(a):
> Thank you @Jacek Pliszka <[email protected]> for you feedback! > Below my answers: > > > > *From:* Jacek Pliszka <[email protected]> > *Sent:* venerdì 1 dicembre 2023 17:35 > *To:* [email protected] > *Subject:* Re: Usage of Azure filesystem with fsspec and adlfs and > pyarrow to download a list of blobs (parquets) concurrently with columns > pruning and rows filtering > > > > Hi! > > > > These files seem to be below 4MB which is the default Azure block. > Possibly they are all read in full. Does someone know if in the approach > below the blob is read only once from Azure even if multiple reads are > called? > > Yes, they are below 4MB each. I don’t know the answer and the implications > about it so I can’t directly answer your question, sadly. If someone > doesn’t mind elaborating more about it, I would be interested. > > > > Is there correlation between "my_index" and the filenames you could > possibly use to avoid reading all of them? > > The index is a timestamp with type datetime64[ns] (from > numpy/pandas/pyarrow AFAIK). The filenames are generated with the following > pattern “year/month/day/hour/asset_name” so with time-granularity of 1 > hour. Now we generate the list of blobs to download handling the logic of > getting only the blobs which potentially have some data that we are > interested in (so inside the 1-hour granularity). The filters of pyarrow or > post-downloading are needed if we require a finer granularity below the 1 > hour or with ranges below it. > I am not sure you do - the filters the way you wrote them are applied after the file is downloaded, so you download files for all hours but read only for 8:00-8:30 - you can download only the files for 8th hour. If the half an hour is what you do - this will mean 24 times fewer files to read. > Another suggestion would be merging the files - either yourself or using > something like Delta Tables and vacuum/compact methods. > > We will assess this type of refactoring in the future, I believe. Can I do > something with the current state of having many small files? > > Others may give you better answers as I am not up to date but some time ago I used Azure SDK directly and downloaded small files to memory buffer: content = container.download_blob(path).readall() # container is azure.storage.blob.ContainerClient buffer = pa.BufferReader(content) pyarrow.parquet.read_table(buffer, columns=columns,filter=..) Then pa.concatenate_tables and combine_chunks I know it defeats the purpose of using adlfs and you have to parallelize it yourself but it worked well for me. BR, Jacek > > pt., 1 gru 2023 o 15:46 Weston Pace <[email protected]> napisał(a): > > Those files are quite small. For every single file pyarrow is going to > need to read the metadata, determine which columns to read (column > filtering), determine if any of the rows need to be read (using row > filtering) and then actually issue the read. If you combined all those > files into one file then I would expect better performance. The ability to > read a single file in parallel is not going to be important here (each file > is very small). However, you will want to make sure it is reading multiple > files at once. I would expect that it is doing so but this would be a good > thing to verify if you can. > > > > One quick test you can always try is to run your script twice, at the same > time. If the total runtime is significantly faster than running the script > twice, one after the other, then you can confirm that there are unexploited > resources on the system. > > > > It also looks like your data is partitioned by time and your filters are > time based filters. You might want to investigate dataset partitioning as > that should be able to help. > > > > On Fri, Dec 1, 2023 at 6:32 AM Luca Maurelli <[email protected]> > wrote: > > I’m new to these libraries so bear with me, I am learning a lot these days. > > > > I started using fsspec and adlfs with the idea of switching between a > cloud storage to a local storage with little effort. I read that adlfs > makes use of the Azure Blob Storage Python SDK which supports the use of > async/await pattern to implement concurrent IO. > > The Python SDK also exposes the max_concurrency argument in the > download_blob function, for instance, to enable the download of a single > blob with a thread pool (note: the single blob, I believe the use case here > is that if the blob is very big you can split the download in parallel with > this argument). > > > > Now I wish to use adlfs with pyarrow/pandas to download a list of blobs > (parquet) by exploiting the async methods of the Python SDK. Not knowing > the libraries and their integration, I hope this is already taken care of, > so I tried to code the following snippet: > > > > import pandas as pd > > import pyarrow.parquet as pq > > import adlfs > > import time > > CONNECTION_STRING = "my_connection_string" > > CONTAINER = "raw" > > FILEPATHS = [ > > f"az://{CONTAINER}/2023/11/{str(day).zfill(2)}/{str(hour).zfill(2)} > /file.parquet" > > for day in range(1, 31) > > for hour in range(24) > > ] > > fs = adlfs.AzureBlobFileSystem(connection_string=CONNECTION_STRING) > > FILTERS = [ > > [ > > ("my_index", ">=", pd.Timestamp("2023-11-08 08:00:00")), > > ("my_index", "<=", pd.Timestamp("2023-11-08 08:30:00")), > > ] > > ] > > COLUMNS = ["col1", "col2", "col3"] > > start_time = time.time() > > dataset = pq.ParquetDataset( > > path_or_paths=FILEPATHS, > > filters=FILTERS, > > filesystem=fs, > > ) > > elapsed_time = time.time() - start_time > > print(f"Elapsed time for ParquetDataset: {elapsed_time:.6f} seconds") > > start_time = time.time() > > df = dataset.read_pandas( > > columns=COLUMNS > > ).to_pandas() > > elapsed_time = time.time() - start_time > > print(f"Elapsed time for read_pandas: {elapsed_time:.6f} seconds") > > Each blob has around 3600 rows and 95 columns. It tries to download 720 > blobs in total. The final dataframe is 236404 rows x 95 columns with no > columns/rows filtering. > > If I enforce the columns pruning, it has 236404 rows x 3 columns (CASE 1). > If I also enforce the rows filtering, it has 1544 rows x 95 columns (CASE > 2). > > > > The timing of the cases is as follows: > > 1. > > # Elapsed time for ParquetDataset: 0.886232 seconds > > # Elapsed time for read_pandas: 146.798920 seconds > > 1. > > # Elapsed time for ParquetDataset: 0.298594 seconds > > # Elapsed time for read_pandas: 203.801083 seconds > > > > I was expecting the case 1 to be faster since from the timestamp only the > first blob should be actually downloaded and read (AFAIK parquet is smart > and it makes use of schema/metadata for the rows/columns filtering). > > I also was expecting case 2 to be faster in general: this is just a > feeling (maybe I was expecting more from concurrent/parallel IO?). > > > > My question: Can I do something better w.r.t performances here? The > parquet files are really smalls compared to other online examples of > dealing with parquet files. Maybe I can tweak some pyarrow arguments? > > > > Thank you, > > Luca > > > > *Luca Maurelli* > > Data Scientist > *__________________________* > > [image: Camozzi Digital s.r.l.] > * Camozzi Digital s.r.l.* > > > * Via Cassala 52 25126 Brescia (BS) ITALY* > > *Phone:* > > *Fax:* > > *Mobile:* > > *Email:* > > [email protected] > > *Website:* > > www.camozzidigital.com > > Questo messaggio di posta elettronica, comprensivo di eventuali allegati, > è ad uso esclusivo di colui al quale è indirizzato e potrebbe contenere > informazioni riservate. Nel caso in cui abbiate ricevuto questa > comunicazione per errore, Vi invitiamo cortesemente a darcene notizia - > contattando il mittente o telefonando al numero (+ 39) 030.37921 - ed a > distruggere e cancellare il messaggio dal Vostro computer. This e-mail > message, including any attachments, is for the exclusive use of the person > to whom it is addressed and may contain confidential information. In case > you are not the intended recipient, we kindly ask you to inform us - by > contacting the sender or by calling the number (+ 39) 030.37921 - and to > destroy and delete the message from your computer. > > > >
