pon., 4 gru 2023 o 14:41 Luca Maurelli <[email protected]> napisał(a):

> Thank you @Jacek Pliszka <[email protected]> for you feedback!
> Below my answers:
>
>
>
> *From:* Jacek Pliszka <[email protected]>
> *Sent:* venerdì 1 dicembre 2023 17:35
> *To:* [email protected]
> *Subject:* Re: Usage of Azure filesystem with fsspec and adlfs and
> pyarrow to download a list of blobs (parquets) concurrently with columns
> pruning and rows filtering
>
>
>
> Hi!
>
>
>
> These files seem to be below 4MB which is the default Azure block.
> Possibly they are all read in full. Does someone know if in the approach
> below the blob is read only once from Azure even if multiple reads are
> called?
>
> Yes, they are below 4MB each. I don’t know the answer and the implications
> about it so I can’t directly answer your question, sadly. If someone
> doesn’t mind elaborating more about it, I would be interested.
>
>
>
> Is there correlation between "my_index" and the filenames you could
> possibly use to avoid reading all of them?
>
> The index is a timestamp with type datetime64[ns] (from
> numpy/pandas/pyarrow AFAIK). The filenames are generated with the following
> pattern “year/month/day/hour/asset_name” so with time-granularity of 1
> hour. Now we generate the list of blobs to download handling the logic of
> getting only the blobs which potentially have some data that we are
> interested in (so inside the 1-hour granularity). The filters of pyarrow or
> post-downloading are needed if we require a finer granularity below the 1
> hour or with ranges below it.
>

I am not sure you do -  the filters the way you wrote them are applied
after the file is downloaded, so you download files for all hours but read
only for 8:00-8:30 - you can download only the files for 8th hour.
If the half an hour is what you do - this will mean 24 times fewer files to
read.


> Another suggestion would be merging the files - either yourself or using
> something like Delta Tables and vacuum/compact methods.
>
> We will assess this type of refactoring in the future, I believe. Can I do
> something with the current state of having many small files?
>
>
Others may give you better answers as I am not up to date but some time ago
I used Azure SDK directly and downloaded small files to memory buffer:

content = container.download_blob(path).readall() # container is
azure.storage.blob.ContainerClient
buffer = pa.BufferReader(content)
pyarrow.parquet.read_table(buffer, columns=columns,filter=..)
Then pa.concatenate_tables and combine_chunks

I know it defeats the purpose of using adlfs and you have to parallelize it
yourself but it worked well for me.

BR,

Jacek


>
> pt., 1 gru 2023 o 15:46 Weston Pace <[email protected]> napisał(a):
>
> Those files are quite small.  For every single file pyarrow is going to
> need to read the metadata, determine which columns to read (column
> filtering), determine if any of the rows need to be read (using row
> filtering) and then actually issue the read.  If you combined all those
> files into one file then I would expect better performance.  The ability to
> read a single file in parallel is not going to be important here (each file
> is very small).  However, you will want to make sure it is reading multiple
> files at once.  I would expect that it is doing so but this would be a good
> thing to verify if you can.
>
>
>
> One quick test you can always try is to run your script twice, at the same
> time.  If the total runtime is significantly faster than running the script
> twice, one after the other, then you can confirm that there are unexploited
> resources on the system.
>
>
>
> It also looks like your data is partitioned by time and your filters are
> time based filters.  You might want to investigate dataset partitioning as
> that should be able to help.
>
>
>
> On Fri, Dec 1, 2023 at 6:32 AM Luca Maurelli <[email protected]>
> wrote:
>
> I’m new to these libraries so bear with me, I am learning a lot these days.
>
>
>
> I started using fsspec and adlfs with the idea of switching between a
> cloud storage to a local storage with little effort. I read that adlfs
> makes use of the Azure Blob Storage Python SDK which supports the use of
> async/await pattern to implement concurrent IO.
>
> The Python SDK also exposes the max_concurrency argument in the
> download_blob function, for instance, to enable the download of a single
> blob with a thread pool (note: the single blob, I believe the use case here
> is that if the blob is very big you can split the download in parallel with
> this argument).
>
>
>
> Now I wish to use adlfs with pyarrow/pandas to download a list of blobs
> (parquet) by exploiting the async methods of the Python SDK. Not knowing
> the libraries and their integration, I hope this is already taken care of,
> so I tried to code the following snippet:
>
>
>
> import pandas as pd
>
> import pyarrow.parquet as pq
>
> import adlfs
>
> import time
>
> CONNECTION_STRING = "my_connection_string"
>
> CONTAINER = "raw"
>
> FILEPATHS = [
>
>     f"az://{CONTAINER}/2023/11/{str(day).zfill(2)}/{str(hour).zfill(2)}
> /file.parquet"
>
>     for day in range(1, 31)
>
>     for hour in range(24)
>
> ]
>
> fs = adlfs.AzureBlobFileSystem(connection_string=CONNECTION_STRING)
>
> FILTERS = [
>
>     [
>
>         ("my_index", ">=", pd.Timestamp("2023-11-08 08:00:00")),
>
>         ("my_index", "<=", pd.Timestamp("2023-11-08 08:30:00")),
>
>     ]
>
> ]
>
> COLUMNS = ["col1", "col2", "col3"]
>
> start_time = time.time()
>
> dataset = pq.ParquetDataset(
>
>     path_or_paths=FILEPATHS,
>
>     filters=FILTERS,
>
>     filesystem=fs,
>
> )
>
> elapsed_time = time.time() - start_time
>
> print(f"Elapsed time for ParquetDataset: {elapsed_time:.6f} seconds")
>
> start_time = time.time()
>
> df = dataset.read_pandas(
>
>     columns=COLUMNS
>
> ).to_pandas()
>
> elapsed_time = time.time() - start_time
>
> print(f"Elapsed time for read_pandas: {elapsed_time:.6f} seconds")
>
> Each blob has around 3600 rows and 95 columns. It tries to download 720
> blobs in total. The final dataframe is 236404 rows x 95 columns with no
> columns/rows filtering.
>
> If I enforce the columns pruning, it has 236404 rows x 3 columns (CASE 1).
> If I also enforce the rows filtering, it has 1544 rows x 95 columns (CASE
> 2).
>
>
>
> The timing of the cases is as follows:
>
>    1.
>
> # Elapsed time for ParquetDataset: 0.886232 seconds
>
> # Elapsed time for read_pandas: 146.798920 seconds
>
>    1.
>
> # Elapsed time for ParquetDataset: 0.298594 seconds
>
> # Elapsed time for read_pandas: 203.801083 seconds
>
>
>
> I was expecting the case 1 to be faster since from the timestamp only the
> first blob should be actually downloaded and read (AFAIK parquet is smart
> and it makes use of schema/metadata for the rows/columns filtering).
>
> I also was expecting case 2 to be faster in general: this is just a
> feeling (maybe I was expecting more from concurrent/parallel IO?).
>
>
>
> My question: Can I do something better w.r.t performances here? The
> parquet files are really smalls compared to other online examples of
> dealing with parquet files. Maybe I can tweak some pyarrow arguments?
>
>
>
> Thank you,
>
> Luca
>
>
>
> *Luca Maurelli*
>
> Data Scientist
> *__________________________*
>
> [image: Camozzi Digital s.r.l.]
> * Camozzi Digital s.r.l.*
>
>
> * Via Cassala 52 25126 Brescia (BS) ITALY*
>
> *Phone:*
>
> *Fax:*
>
> *Mobile:*
>
> *Email:*
>
> [email protected]
>
> *Website:*
>
> www.camozzidigital.com
>
> Questo messaggio di posta elettronica, comprensivo di eventuali allegati,
> è ad uso esclusivo di colui al quale è indirizzato e potrebbe contenere
> informazioni riservate. Nel caso in cui abbiate ricevuto questa
> comunicazione per errore, Vi invitiamo cortesemente a darcene notizia -
> contattando il mittente o telefonando al numero (+ 39) 030.37921 - ed a
> distruggere e cancellare il messaggio dal Vostro computer. This e-mail
> message, including any attachments, is for the exclusive use of the person
> to whom it is addressed and may contain confidential information. In case
> you are not the intended recipient, we kindly ask you to inform us - by
> contacting the sender or by calling the number (+ 39) 030.37921 - and to
> destroy and delete the message from your computer.
>
>
>
>

Reply via email to