Awesome.

# Partitioning (src/arrow/dataset/partition.h)

The first spot to look at might be to understand the Partitioning class.  A
Partitioning (e.g. hive partitioning, directory partitioning, filename
partitioning) has two main methods that convert between a path (e.g.
"/state_code=11/city_code=106/chunk-0.parquet") and an expression (e.g.
field_ref("state_code") == 11 && field_ref("city_code") == 106).

virtual Result<compute::Expression> Parse(const std::string& path) const =
0;
virtual Result<PartitionPathFormat> Format(const compute::Expression& expr)
const = 0;

We use expressions instead of something simpler like a dictionary of
key/value pairs.  I believe the intention was to leave the door opening for
unique partitionings that might map something like "temp=medium" to an
expression like "30 < field_ref("temp") < 60" but in practice these
expressions are always a collection of equality expressions and'd together.

One thing I'm fairly certain should work, but we might want to verify (and
potentially add unit tests), is that hive & directory partitioning can
correctly convert directories (e.g. "/state_code=11") into expressions and
that we aren't relying on full paths.

# Discovery (src/arrow/dataset/discovery.h)

The next class is DatasetFactory and, specifically, we are probably
interested in FileSystemDatasetFactory.  The FileSystemDatasetFactory class
scans a directory, discovering files.  The FileSystemDatasetFactory can
also discover a partitioning.  This is all usually transparent in pyarrow
but there are bindings and you can use this class explicitly:

import pyarrow.dataset as ds
import pyarrow.fs as fs
local_fs = fs.LocalFileSystem()
format = ds.ParquetFileFormat()
opts =
ds.FileSystemFactoryOptions(partitioning=ds.HivePartitioning.discover())
factory = ds.FileSystemDatasetFactory(local_fs,
fs.FileSelector('/tmp/my_dataset', recursive=True), format, opts)
sch = factory.inspect()
my_dataset = factory.finish(sch)
print(my_dataset.partitioning.schema)
# state_code: int32
# city_code: int32

Although, in practice, we usually would write something much shorter:

import pyarrow.dataset as ds
my_dataset = ds.dataset('/tmp/my_dataset', partitioning='hive') # This one
line will expand to everything above

If, however, we want to apply a filter while we are discovering the
dataset, then we cannot rely on dataset discovery to also discover our
partition.  We will have to specify it manually (or you could run dataset
discovery once, discover the partitioning, and then run it many times using
this discovered partition).  So I think, in the end, we want to be able to
support something like this:

import pyarrow.dataset as ds
partitioning = ds.HivePartitioning(pa.schema([pa.field('state_code',
pa.int32()), pa.field('city_code', pa.int32())]))
filter = (ds.field('state_code') == 31) & (ds.field('city_code') == 6200)
ds = ds.dataset('/tmp/my_dataset', partitioning=partitioning, filter=filter)

This will require making changes to FileSystemDatasetFactory.  We will want
to add "filter" to FileSystemFactoryOptions...

struct FileSystemFactoryOptions {
  PartitioningOrFactory partitioning{Partitioning::Default()};
  std::string partition_base_dir;
  bool exclude_invalid_files = false;
  std::vector<std::string> selector_ignore_prefixes = {".", "_",};
  // `compute::literal(true)` is often used as a "default" value for a
filter
  compute::Expression filter = compute::literal(true);
};

* If a filter is specified (is not literal(true)) then partitioning MUST be
a partitioning (and not a partitioning factory).

Then we need to modify the discovery process itself to only list files that
match the filter.  Unfortunately, that might end up being a little tricky.
The code right now is simply...

ARROW_ASSIGN_OR_RAISE(auto files, filesystem->GetFileInfo(selector));

So FileSystemDatasetFactory is not actually doing the recursive directory
walking logic.  Instead it is just asking for all files matching a
selector.  Maybe GetFileInfo could be extended to take an optional
predicate which will be applied to directories before entering them.
Although this might not be straightforward for remote filesystem
implementations (e.g. S3, GCS) that don't actually "walk" a directory but
instead do simple prefix matching.  Another option could be to extend
Partitioning so that it could take in an expression and return a prefix.
GetFileInfo could then be extended to only return files that match the
given prefix.

Finally, you'd want to create some kind of benchmark and/or unit tests to
justify the change and prevent regression.

A simple python benchmark (using today's code without a filter) could be:

import os
import tempfile
import time

import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.parquet as pq

simple_table = pa.Table.from_pydict({'x': [1, 2, 3]})

with tempfile.TemporaryDirectory() as tempdir:

    for state_code in range(100):
        for city_code in range(100):
            pq_dir = os.path.join(tempdir, f'state_code={state_code}',
f'city_code={city_code}')
            os.makedirs(pq_dir)
            pq_path = os.path.join(pq_dir, 'chunk-0.parquet')
            pq.write_table(simple_table, pq_path)

    start = time.time()
    partitioning = ds.HivePartitioning(pa.schema([pa.field('state_code',
pa.int32()), pa.field('city_code', pa.int32())]))
    my_dataset = ds.dataset(tempdir, partitioning=partitioning)
    end = time.time()
    print(f'Elapsed: {end - start}s')

# Approach 2, faster filtering

There is an entirely different approach that could be taken which wouldn't
speed up the discovery at all, but should speed up the filtering.  In this
approach you could modify FileSystemDataset so that, instead of storing a
flat list of FileFragment objects, it stored a tree of FileFragment
objects.  Each node of the tree would have its own partitioning
expression.  Then, GetFragments(predicate) could walk the tree (in DFS
order), skipping entire nodes that fail the predicate.

On Thu, Aug 4, 2022 at 9:54 AM Tomaz Maia Suller <[email protected]>
wrote:

> Weston, I'm interested in following up.
>
>
> ------------------------------
> *De:* Weston Pace <[email protected]>
> *Enviado:* quinta-feira, 4 de agosto de 2022 12:15
> *Para:* [email protected] <[email protected]>
> *Assunto:* Re: Issue filtering partitioned Parquet files on partition
> keys using PyArrow
>
> Você não costuma receber emails de [email protected]. Saiba por que
> isso é importante <https://aka.ms/LearnAboutSenderIdentification>
> *[EXTERNAL EMAIL]*
>
> There is a lot of room for improvement here.  In the datasets API the call
> that you have described (read_parquet) is broken into two steps:
>
>  * dataset discovery
>
> During dataset discovery we don't use any partition filter.  The goal is
> to create the "total dataset" of all the files.  So in your case this means
> listing out all 150,120 directories.  For every file we discover we capture
> a partition expression for this file.  This is probably where the bulk of
> time is being spent (listing the directories).
>
>  * dataset read
>
> During the dataset read we apply the partition filter.  So we are going to
> iterate through all ~150k files and compare the filter expression with the
> previously captured partition expression, eliminating files that don't
> match.  In this phase we don't have any idea of the original directory
> structure.  So instead of performing 27 top-level comparisons + 5560
> second-level comparisons we end up having to calculate all 150k comparisons.
>
> Both of these steps are considerably longer than they need to be.  If I
> were to guess I would guess that a majority of the time is spent in that
> first step but I don't think the time spent in that second step is
> negligible.
>
> One fairly straightforward solution would be to allow the partition filter
> to be used during dataset discovery.  This would yield a much smaller
> dataset so step 2 would be much faster but it could also allow the
> discovery process to skip entire directories.  If anyone is interested in
> working on a fix for this I'd be happy to point them at the files that will
> need to be changed and go into a more detailed discussion of potential
> solutions.
>
>
> On Thu, Aug 4, 2022 at 5:53 AM Tomaz Maia Suller <[email protected]>
> wrote:
>
> Hi David,
>
> I wonder if the problem with the attachments has to do with the files not
> having extensions... I'm trying to send them with .prof this time.
>
> Anyway:
>
>    1. I'm writing to a local filesystem; I've mounted a NFTS partition
>    which is on a HDD. Since the dataset is only ~1.5 GB, I'll try to move it
>    to the SSD I have available and see if I get lower access times.
>    2. I'm using trying to use ParquetDataset; though I'm using it
>    directly most of the time, i.e. I'm using Pandas which then itself uses (if
>    I understood it correctly) ParquetDataset.
>
> I've tried accessing with both the legacy and new versions of the API,
> according to that use_legacy_dataset parameter. The legacy API is
> significantly faster, with access time of about 1 second, though still
> ridiculously slow compared to accessing the path straight away.
>
> If the attachments still don't work for some reason, I'll write up what I
> ran:
>
> >>> pq_command_new = "pq.ParquetDataset('.', filters=[('state_code', '==',
> 31), ('city_code', '==', 6200)], use_legacy_dataset=False)"
> >>> pq_command_old = "pq.ParquetDataset('.', filters=[('state_code', '==',
> 31), ('city_code', '==', 6200)], use_legacy_dataset=True)"
> >>> pq_baseline = "pq.ParquetDataset('./state_code=31/city_code=6200')"
>
> >>> cProfile.run(pq_command_new, '/tmp/pq_legacy_false.prof')
> This took about 17 seconds.
>
> >>> cProfile.run(pq_command_old, '/tmp/pq_legacy_true.prof')
> This took about 1 second.
>
> >>> cProfile.run(pq_baseline, '/tmp/pq_legacy_true.prof')
> This took 0.0075 second.
>
> These runs were all after the first run after booting up the computer,
> which took over 500 seconds as I've said.
>
> I'm starting to think I should send this to the development mailing list
> rather than the user one, since the obvious solution is specifying the
> paths directly rather than trying to use the API.
> ------------------------------
> *De:* Lee, David <[email protected]>
> *Enviado:* quarta-feira, 3 de agosto de 2022 19:49
> *Para:* [email protected] <[email protected]>
> *Assunto:* RE: Issue filtering partitioned Parquet files on partition
> keys using PyArrow
>
> Você não costuma receber emails de [email protected]. Saiba por que
> isso é importante <https://aka.ms/LearnAboutSenderIdentification>
> *[EXTERNAL EMAIL]*
>
> The attachments didn’t come through properly..
>
>
>
> I’ve got additional questions.
>
>
>
>    1. What filesystem are these files stored on? I’ve seen issues using
>    S3 if HEAD operations aren’t prioritized. I’m assuming that without HEAD
>    operations you can’t effectively scan a parquet file’s footer and reading
>    the entire file isn’t efficient.
>
>
>
> *available (eventual consistency for HEAD operations)* Behaves the same
> as the “read-after-new-write” consistency level, but only provides eventual
> consistency for HEAD operations. Offers higher availability for HEAD
> operations than “read-after-new-write” if Storage Nodes are unavailable.
> Differs from AWS S3 consistency guarantees for HEAD operations only.
>
>
> 2.    Are you using pyarrow.parquet.ParquetDataset or pyarrow.dataset?
>
> https://arrow.apache.org/docs/python/parquet.html
>
> *Note*
>
> The ParquetDataset is being reimplemented based on the new generic Dataset
> API (see the Tabular Datasets
> <https://arrow.apache.org/docs/python/dataset.html#dataset> docs for an
> overview). This is not yet the default, but can already be enabled by
> passing the use_legacy_dataset=False keyword to *ParquetDataset* or
> *read_table()*:
>
> pq.ParquetDataset('dataset_name/', use_legacy_dataset=*False*)
>
> Enabling this gives the following new features:
>
>    - Filtering on all columns (using row group statistics) instead of
>    only on the partition keys.
>    - More fine-grained partitioning: support for a directory partitioning
>    scheme in addition to the Hive-like partitioning (e.g. “/2019/11/15/”
>    instead of “/year=2019/month=11/day=15/”), and the ability to specify a
>    schema for the partition keys.
>    - General performance improvement and bug fixes.
>
> It also has the following changes in behaviour:
>
>    - The partition keys need to be explicitly included in the columns keyword
>    when you want to include them in the result while reading a subset of the
>    columns
>
> This new implementation is already enabled in read_table, and in the
> future, this will be turned on by default for ParquetDataset. The new
> implementation does not yet cover all existing ParquetDataset features
> (e.g. specifying the metadata, or the pieces property API). Feedback is
> very welcome.
>
>
>
>
>
> *From:* Tomaz Maia Suller <[email protected]>
> *Sent:* Wednesday, August 3, 2022 2:54 PM
> *To:* [email protected]
> *Subject:* Issue filtering partitioned Parquet files on partition keys
> using PyArrow
>
>
>
> External Email: Use caution with links and attachments
>
> Hi,
>
>
>
> I'm trying to load a dataset I created consisting of Parquet files
> partitioned on two columns, but reading from a single partition takes over
> 10 minutes on the first try and still over 15 seconds on any subsequent one
> while specifying the path to the partition directly takes 50 milliseconds.
> Am I doing something wrong?
>
>
>
> The data is arranged in the following way:
>
> $ tree -d
>
> .
>
> ├── state_code=11
>
> │   ├── city_code=1005
>
> │   ├── city_code=106
>
> │   ├── city_code=1104
>
> │   ├── city_code=114
>
> │   ├── city_code=1203
>
> │   ├── city_code=122
>
> │   ├── city_code=130
>
> │   ├── city_code=1302
>
> ...
>
>
>
> There are 27 state codes and 5560 city codes, so 27 "first level"
> partitions and 5560 "second level" partitions in total. Each partition
> often contains only a few kBs worth of Parquet files and nome is greates
> than ~5MB. These files were written using PySpark and I have full control
> of how they're generated, in case you think there's a better way to arrange
> them. I chose this partitioning since I wish to analyse one city at a time;
> I have also experimented with having only a single level partitioning with
> 5560 partitions, but didn't see any increase in performance.
>
>
>
> I'm using Pandas to read the files, and have tried using PyArrow directly
> as well. Regardless, I've profiled the reading of a single partition using
> cProfile, and the results clearly show PyArrow is taking the longest to
> run. I've attached the results of two runs I did using IPython: one right
> after rebooting my computer, which took well over 500 seconds; and one
> executed right after that, which took about 15 seconds, with the following
> command:
>
>
>
> >>> command = "pd.read_parquet('.', engine='pyarrow',
> filters=[('state_code', '==', 31), ('city_code', '==', 6200)])"
>
> >>> cProfile.run(command, '/path/to/stats')
>
>
>
> It was much better the second time around but still terrible compared to
> specifying the path manually, which took around 50 milliseconds according
> to %timeit.
>
>
>
> I have absolutely no idea why the filesystem scan is taking so long. I
> have seen this issue https://issues.apache.org/jira/browse/ARROW-11781
> <https://urldefense.com/v3/__https:/issues.apache.org/jira/browse/ARROW-11781__;!!KSjYCgUGsB4!a08YdpWy7XPfHdgrfg8lMkMTia4epHAyid9h4ZrOEkINTFDbNphftopUuNpDDeL2-ZUSQttF_62sxBUHeVVk_g$>
>  related
> to the same problem, but it mentions there should be no performance issue
> as of July 2021, whereas I'm having problems right now.
>
>
>
> I think I'll stick to specifying the partitions to read "by hand", but I
> was really really curious on whether I messed something up, or if (Py)Arrow
> really is inefficient in a task which seems so trivial at first sight.
>
>
>
> Thanks,
>
> Tomaz.
>
>
>
> P.S.: It's my first time sending an email to a mailing list, so I hope
> sending attachments is okay, and sorry if it isn't.
>
>
>
>
>
>
> Importante: As informações deste e-mail são confidenciais. O uso não
> autorizado é proibido por lei. Por favor, considere o ambiente antes de
> imprimir.
>
> Important: The information on this e-mail is confidential. Non-authorized
> use is prohibited by law. Please Consider the Environment Before Printing.
>
>
>
> This message may contain information that is confidential or privileged.
> If you are not the intended recipient, please advise the sender immediately
> and delete this message. See
> http://www.blackrock.com/corporate/compliance/email-disclaimers for
> further information.  Please refer to
> http://www.blackrock.com/corporate/compliance/privacy-policy for more
> information about BlackRock’s Privacy Policy.
>
>
> For a list of BlackRock's office addresses worldwide, see
> http://www.blackrock.com/corporate/about-us/contacts-locations.
>
> © 2022 BlackRock, Inc. All rights reserved.
>
>

Reply via email to