Hello Kelton,

playing around with the files you referenced and with the code you added
the following can be observed and improved to make the code work:

*1) Defining the partitioning of a dataset*

When running *data.files* on your dataset shows that the files are
partitioned according to the *hive *structure. In this case the hive schema
can be discovered from the directory structure - if the “HivePartitioning”
is selected. In your case, when supplying a list of names
“DirectoryPartitioning” is triggered and the filter can not find the
correct partitions.

See:
https://arrow.apache.org/docs/python/generated/pyarrow.dataset.partitioning.html
and https://issues.apache.org/jira/browse/ARROW-15310

What you should do is

   - use *partition="hive"* in the *ds.dataset* or
   - omit the partition argument in the ParquetDataset API (as hive is the
   default) to make use of the hive structure.

*2) Hive structure & integers in filters*

The second thing are the filters, as you have guessed.

If we use Hive Partitioning schema we need to use integers not strings when
supplying filters for the partitions. For example: *('year', '=', 2005)*.
If you may decide to use the partitioning specified with a list anyways,
and so using the “DirectoryPartitioning” scheme, you would need to use the
filtering like so: *('year', '=', 'year=2005')*.

Also be careful when filtering the month and day numbers (01 vs 1).

*3) Column with a mismatching type*

It is possible that you will encounter an error afterwards (when
calling *to_table
on a dataset or reading with ParquetDataset)* as the data types from the
files do not match 100% (for example "pres" can be *int* or *double *in
your data). In this case I advise you to supply a schema that specifies
these types as double. Do be careful to add partition names to the schema
also.

See: https://issues.apache.org/jira/browse/ARROW-15307
and https://issues.apache.org/jira/browse/ARROW-15311

*Summing all up in the code that worked for me:*

import gcsfs
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds

fs = gcsfs.GCSFileSystem()
data = pq.ParquetDataset("global-radiosondes/hires_sonde", filesystem=fs,
                         use_legacy_dataset=False,
                         filters=[
                            ('year', '=', 2005),
                            ('month', '=', 10),
                            ('day', '=', 1),
                            ('hour', '=', 0)])
table = data.read(columns=["pres", "hght"])
df = table.to_pandas()

or:

import gcsfs
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds

fs = gcsfs.GCSFileSystem()
data = pq.ParquetDataset("global-radiosondes/hires_sonde", filesystem=fs,
                         partitioning=["year", "month", "day", "hour",
"site"],
                         use_legacy_dataset=False,
                         filters=[
                            ('year', '=', 'year=2005'),
                            ('month', '=', 'month=10'),
                            ('day', '=', 'day=1'),
                            ('hour', '=', 'hour=0')])
table = data.read(columns=["pres", "hght"])
df = table.to_pandas()

or:

import gcsfs
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds

fs = gcsfs.GCSFileSystem()

schema = pa.schema([("pres", "double"), ("hght", "double"), ("year",
"int32"), ("month", "int32"), ("day", "int32"), ("hour","int32")])


data = ds.dataset("global-radiosondes/hires_sonde", filesystem=fs,
format="parquet", partitioning="hive", schema=schema)
subset = (ds.field("year") == 2022) & (ds.field("month") == 1) \
       & (ds.field("day") == 9) & (ds.field("hour") == 12)
table = data.to_table(filter=subset)

Hope this helps.

Best,
Alenka

On Mon, Jan 10, 2022 at 1:02 AM Kelton Halbert <[email protected]> wrote:

> An example using the pyarrow.dataset api…
>
>
> data = ds.dataset("global-radiosondes/hires_sonde", filesystem=fs,
> format="parquet",
>                          partitioning=["year", "month", "day", "hour",
> "site"])
> subset = (ds.field("year") == "2022") & (ds.field("month") == "01") \
>        & (ds.field("day") == "09") & (ds.field("hour") == "12")
> batches = list(data.to_batches(filter=subset))
> print(batches)
>
> Output:
>
> []
>
>
>
> On Jan 9, 2022, at 3:46 PM, Kelton Halbert <[email protected]> wrote:
>
> Hello - I’m not sure if this is a bug, or if I’m not using the API
> correctly, but I have a partitioned parquet dataset stored on a Google
> Cloud Bucket that I am attempting to load for analysis. However, when
> applying filters to the dataset (using both the pyarrow.dataset and
> pyarrow.parquet.ParquetDataset APIs), I receive empty data frames and
> tables.
>
> Here is my sample code:
>
> import matplotlib.pyplot as plt
> import pyarrow.dataset as ds
> import numpy as np
> import gcsfs
> import pyarrow.parquet as pq
>
> fs = gcsfs.GCSFileSystem()
> data = pq.ParquetDataset("global-radiosondes/hires_sonde", filesystem=fs,
>                          partitioning=["year", "month", "day", "hour",
> "site"],
>                          use_legacy_dataset=False,
>                          filters=[
>                             ('year', '=', '2022'),
>                             ('month', '=', '01'),
>                             ('day', '=', '09'),
>                             ('hour', '=', '12')])
> table = data.read(columns=["pres", "hght"])
> df = table.to_pandas()
> print(df)
>
> With the following output:
>
> Empty DataFrame
> Columns: [pres, hght]
> Index: []
>
>
>
> Am I applying this incorrectly somehow? Any help would be appreciated.
> Again, the same issue happens when using the pyarrow.dataset API to load as
> well. The data bucket is public, so feel free to experiment. If I load the
> whole dataset into a pandas data frame, it works fine. Issue seems to be
> the filtering.
>
> Thanks,
> Kelton.
>
>
>

Reply via email to