Thank you @Weston Pace<mailto:[email protected]> for your feedback.
Below my answers in red: From: Weston Pace <[email protected]> Sent: venerdì 1 dicembre 2023 15:46 To: [email protected] Subject: Re: Usage of Azure filesystem with fsspec and adlfs and pyarrow to download a list of blobs (parquets) concurrently with columns pruning and rows filtering Those files are quite small. 1. Yes, these files as stated are small compared to the use cases seen online, at least this is my feeling. By inspecting some random files, they range from 400 to 600 KiB each as an order of magnitude. For every single file pyarrow is going to need to read the metadata, determine which columns to read (column filtering), determine if any of the rows need to be read (using row filtering) and then actually issue the read. If you combined all those files into one file then I would expect better performance. 1. Yes, this reasoning is a type of refactoring that we will investigate better but require a major redesign and processing of historical files. We will try to assess the priority and costs of it. The ability to read a single file in parallel is not going to be important here (each file is very small). However, you will want to make sure it is reading multiple files at once. I would expect that it is doing so but this would be a good thing to verify if you can. One quick test you can always try is to run your script twice, at the same time. If the total runtime is significantly faster than running the script twice, one after the other, then you can confirm that there are unexploited resources on the system. 1. This was my concern, indeed. I can run the tests as you mentioned to assess the use of resources and see if they are downloaded in parallel. 2. However, I am also interested in the right use of the pyarrow library to verify the concurrency of multiple IO. Is pyarrow implementing this logic or is it derived from the fsspec integration? I’m asking this to know if I should ask here or from the fsspec developers, instead. It also looks like your data is partitioned by time and your filters are time based filters. You might want to investigate dataset partitioning as that should be able to help. 1. My data are time-series information where the timestamp of the value is used as indexing. The columns are different time-series related to the same asset (the blob file) and they are grouped with a granularity of 1 hour now. Do you believe portioning with such small files might be beneficial actually? I am worried that the overhead of the portioning would be higher that then pros, but it is just my personal feeling about it. 1. My other question is if I can do something by tweaking pyarrow arguments or options for my use case of small files without the refactor of point 1). Thank you, Luca On Fri, Dec 1, 2023 at 6:32 AM Luca Maurelli <[email protected]<mailto:[email protected]>> wrote: I’m new to these libraries so bear with me, I am learning a lot these days. I started using fsspec and adlfs with the idea of switching between a cloud storage to a local storage with little effort. I read that adlfs makes use of the Azure Blob Storage Python SDK which supports the use of async/await pattern to implement concurrent IO. The Python SDK also exposes the max_concurrency argument in the download_blob function, for instance, to enable the download of a single blob with a thread pool (note: the single blob, I believe the use case here is that if the blob is very big you can split the download in parallel with this argument). Now I wish to use adlfs with pyarrow/pandas to download a list of blobs (parquet) by exploiting the async methods of the Python SDK. Not knowing the libraries and their integration, I hope this is already taken care of, so I tried to code the following snippet: import pandas as pd import pyarrow.parquet as pq import adlfs import time CONNECTION_STRING = "my_connection_string" CONTAINER = "raw" FILEPATHS = [ f"az://{CONTAINER}/2023/11/{str(day).zfill(2)}/{str(hour).zfill(2)}/file.parquet" for day in range(1, 31) for hour in range(24) ] fs = adlfs.AzureBlobFileSystem(connection_string=CONNECTION_STRING) FILTERS = [ [ ("my_index", ">=", pd.Timestamp("2023-11-08 08:00:00")), ("my_index", "<=", pd.Timestamp("2023-11-08 08:30:00")), ] ] COLUMNS = ["col1", "col2", "col3"] start_time = time.time() dataset = pq.ParquetDataset( path_or_paths=FILEPATHS, filters=FILTERS, filesystem=fs, ) elapsed_time = time.time() - start_time print(f"Elapsed time for ParquetDataset: {elapsed_time:.6f} seconds") start_time = time.time() df = dataset.read_pandas( columns=COLUMNS ).to_pandas() elapsed_time = time.time() - start_time print(f"Elapsed time for read_pandas: {elapsed_time:.6f} seconds") Each blob has around 3600 rows and 95 columns. It tries to download 720 blobs in total. The final dataframe is 236404 rows x 95 columns with no columns/rows filtering. If I enforce the columns pruning, it has 236404 rows x 3 columns (CASE 1). If I also enforce the rows filtering, it has 1544 rows x 95 columns (CASE 2). The timing of the cases is as follows: 1. # Elapsed time for ParquetDataset: 0.886232 seconds # Elapsed time for read_pandas: 146.798920 seconds 1. # Elapsed time for ParquetDataset: 0.298594 seconds # Elapsed time for read_pandas: 203.801083 seconds I was expecting the case 1 to be faster since from the timestamp only the first blob should be actually downloaded and read (AFAIK parquet is smart and it makes use of schema/metadata for the rows/columns filtering). I also was expecting case 2 to be faster in general: this is just a feeling (maybe I was expecting more from concurrent/parallel IO?). My question: Can I do something better w.r.t performances here? The parquet files are really smalls compared to other online examples of dealing with parquet files. Maybe I can tweak some pyarrow arguments? Thank you, Luca Luca Maurelli Data Scientist __________________________ [Camozzi Digital s.r.l.] Camozzi Digital s.r.l. Via Cassala 52 25126 Brescia (BS) ITALY Phone: Fax: Mobile: Email: [email protected]<mailto:[email protected]> Website: www.camozzidigital.com<http://www.camozzidigital.com/> Questo messaggio di posta elettronica, comprensivo di eventuali allegati, è ad uso esclusivo di colui al quale è indirizzato e potrebbe contenere informazioni riservate. Nel caso in cui abbiate ricevuto questa comunicazione per errore, Vi invitiamo cortesemente a darcene notizia - contattando il mittente o telefonando al numero (+ 39) 030.37921 - ed a distruggere e cancellare il messaggio dal Vostro computer. This e-mail message, including any attachments, is for the exclusive use of the person to whom it is addressed and may contain confidential information. In case you are not the intended recipient, we kindly ask you to inform us - by contacting the sender or by calling the number (+ 39) 030.37921 - and to destroy and delete the message from your computer.
