Thank you very much for the helpful response, Alenka. This provides much more
clarity to the partitioning system and how I should be interacting with it. I’m
in the process of re-processing my dataset to use integers for the date
partitioning, but still use strings for the site identifiers. I don’t need to
check those for anything other than equality, and it seems to be working.
I’ve also started the process of properly enforcing column types, as there
should be no need for that to be varied in this dataset. I’ve also had to
reprocess a few other things since the original dataset had some changes made
half way through.
If you don’t mind, I do have a few more questions regarding large datasets,
GCSFS/S3, and python performance.
I’ve read that there is the ability to write extra metadata through _metadata
and _common_metadata, and I get the impression that it’s supposed to help with
dataset discovery without crawling large trees of directories. My dataset has
about 100 sites per day, that update anywhere from 2-4 times per day. This
archive extends a few decades back in time, meaning that on average I have
about 131K parquet files per year (each parquet file is a site observation with
about 5-6K rows each with something like 12 columns).
My question is, is there a good strategy by which I can create and update this
metadata with every new observation added to the record? Or do I have to load
the entire dataset and then write new metadata? Or, would this even provide the
performance benefits I need? Right now loading the dataset and applying filters
for a single year takes about a combined 60 seconds, but for the entire period
of record it’s a few minutes. Not awful, and accessing individual observations
is easy, but I would like to squeeze as much performance on the dataset as a
whole if possible even if using cloud storage. Based on my timings of 1 year vs
the full record, it does seem that the primary bulk of execution is in the
dataset creation and discovery, so if I can leverage _metadata and
_common_metadata to improve this, I’d like to.
Here are the timings for loading the dataset and applying a filter operation
for a single year, and then the whole dataset.
Code:
fs = gcsfs.GCSFileSystem()
partitioning = ds.HivePartitioning(
pyarrow.schema([
pyarrow.field('year', pyarrow.int32()),
pyarrow.field('month', pyarrow.int32()),
pyarrow.field('day', pyarrow.int32()),
pyarrow.field('hour', pyarrow.int32()),
pyarrow.field('site', pyarrow.string())
])
)
schema = pyarrow.schema([
pyarrow.field('lon', pyarrow.float32()),
pyarrow.field('lat', pyarrow.float32()),
pyarrow.field('pres', pyarrow.float32()),
pyarrow.field('hght', pyarrow.float32()),
pyarrow.field(‘gpht’, pyarrow.float32()),
pyarrow.field('tmpc', pyarrow.float32()),
pyarrow.field('dwpc', pyarrow.float32()),
pyarrow.field('relh', pyarrow.float32()),
pyarrow.field('uwin', pyarrow.float32()),
pyarrow.field('vwin', pyarrow.float32()),
pyarrow.field('wspd', pyarrow.float32()),
pyarrow.field('wdir', pyarrow.float32()),
pyarrow.field('year', pyarrow.int32()),
pyarrow.field('month', pyarrow.int32()),
pyarrow.field('day', pyarrow.int32()),
pyarrow.field('hour', pyarrow.int32()),
pyarrow.field('site', pyarrow.string())
])
%time data = ds.dataset("global-radiosondes/hires_sonde/year=2021",
filesystem=fs, format="parquet", \
partitioning=partitioning, schema=schema)
subset = (ds.field("site") == "70026")
%time batches = list(data.to_batches(filter=subset, columns=["pres", "hght",
"tmpc", "wspd", "wdir", "year", "month", "day", "hour"], \
use_threads=True, use_async=True))
print(len(batches))
Time for ds.dataset (1 year):
Wall time: 20.1 s
Time for dataset.to_batches (1 year, filter select 1 site):
Wall time: 29.5 s
Time for ds.dataset (full record):
Wall time: 3min 29s
Time for dataset.to_batches(full record, filter select 1 year and 1 site):
Wall time: 28.7 s
Thanks,
Kelton.
> On Jan 12, 2022, at 10:06 AM, Alenka Frim <[email protected]> wrote:
>
> Hello Kelton,
>
> playing around with the files you referenced and with the code you added the
> following can be observed and improved to make the code work:
>
> 1) Defining the partitioning of a dataset
>
> When running data.files on your dataset shows that the files are partitioned
> according to the hive structure. In this case the hive schema can be
> discovered from the directory structure - if the “HivePartitioning” is
> selected. In your case, when supplying a list of names
> “DirectoryPartitioning” is triggered and the filter can not find the correct
> partitions.
>
> See:
> https://arrow.apache.org/docs/python/generated/pyarrow.dataset.partitioning.html
>
> <https://arrow.apache.org/docs/python/generated/pyarrow.dataset.partitioning.html>
> and https://issues.apache.org/jira/browse/ARROW-15310
> <https://issues.apache.org/jira/browse/ARROW-15310>
>
> What you should do is
> use partition="hive" in the ds.dataset or
> omit the partition argument in the ParquetDataset API (as hive is the
> default) to make use of the hive structure.
> 2) Hive structure & integers in filters
>
> The second thing are the filters, as you have guessed.
>
> If we use Hive Partitioning schema we need to use integers not strings when
> supplying filters for the partitions. For example: ('year', '=', 2005).
> If you may decide to use the partitioning specified with a list anyways, and
> so using the “DirectoryPartitioning” scheme, you would need to use the
> filtering like so: ('year', '=', 'year=2005').
>
> Also be careful when filtering the month and day numbers (01 vs 1).
>
> 3) Column with a mismatching type
>
> It is possible that you will encounter an error afterwards (when calling
> to_table on a dataset or reading with ParquetDataset) as the data types from
> the files do not match 100% (for example "pres" can be int or double in your
> data). In this case I advise you to supply a schema that specifies these
> types as double. Do be careful to add partition names to the schema also.
>
> See: https://issues.apache.org/jira/browse/ARROW-15307
> <https://issues.apache.org/jira/browse/ARROW-15307>
> and https://issues.apache.org/jira/browse/ARROW-15311
> <https://issues.apache.org/jira/browse/ARROW-15311>
>
> Summing all up in the code that worked for me:
>
> import gcsfs
> import pyarrow as pa
> import pyarrow.parquet as pq
> import pyarrow.dataset as ds
>
> fs = gcsfs.GCSFileSystem()
> data = pq.ParquetDataset("global-radiosondes/hires_sonde", filesystem=fs,
> use_legacy_dataset=False,
> filters=[
> ('year', '=', 2005),
> ('month', '=', 10),
> ('day', '=', 1),
> ('hour', '=', 0)])
> table = data.read(columns=["pres", "hght"])
> df = table.to_pandas()
>
> or:
>
> import gcsfs
> import pyarrow as pa
> import pyarrow.parquet as pq
> import pyarrow.dataset as ds
>
> fs = gcsfs.GCSFileSystem()
> data = pq.ParquetDataset("global-radiosondes/hires_sonde", filesystem=fs,
> partitioning=["year", "month", "day", "hour",
> "site"],
> use_legacy_dataset=False,
> filters=[
> ('year', '=', 'year=2005'),
> ('month', '=', 'month=10'),
> ('day', '=', 'day=1'),
> ('hour', '=', 'hour=0')])
> table = data.read(columns=["pres", "hght"])
> df = table.to_pandas()
>
> or:
> import gcsfs
> import pyarrow as pa
> import pyarrow.parquet as pq
> import pyarrow.dataset as ds
>
> fs = gcsfs.GCSFileSystem()
>
> schema = pa.schema([("pres", "double"), ("hght", "double"), ("year",
> "int32"), ("month", "int32"), ("day", "int32"), ("hour","int32")])
>
> data = ds.dataset("global-radiosondes/hires_sonde", filesystem=fs,
> format="parquet", partitioning="hive", schema=schema)
> subset = (ds.field("year") == 2022) & (ds.field("month") == 1) \
> & (ds.field("day") == 9) & (ds.field("hour") == 12)
> table = data.to_table(filter=subset)
>
> Hope this helps.
>
> Best,
> Alenka
>
> On Mon, Jan 10, 2022 at 1:02 AM Kelton Halbert <[email protected]
> <mailto:[email protected]>> wrote:
> An example using the pyarrow.dataset api…
>
>
> data = ds.dataset("global-radiosondes/hires_sonde", filesystem=fs,
> format="parquet",
> partitioning=["year", "month", "day", "hour",
> "site"])
> subset = (ds.field("year") == "2022") & (ds.field("month") == "01") \
> & (ds.field("day") == "09") & (ds.field("hour") == "12")
> batches = list(data.to_batches(filter=subset))
> print(batches)
>
> Output:
> []
>
>
>> On Jan 9, 2022, at 3:46 PM, Kelton Halbert <[email protected]
>> <mailto:[email protected]>> wrote:
>>
>> Hello - I’m not sure if this is a bug, or if I’m not using the API
>> correctly, but I have a partitioned parquet dataset stored on a Google Cloud
>> Bucket that I am attempting to load for analysis. However, when applying
>> filters to the dataset (using both the pyarrow.dataset and
>> pyarrow.parquet.ParquetDataset APIs), I receive empty data frames and tables.
>>
>> Here is my sample code:
>>
>> import matplotlib.pyplot as plt
>> import pyarrow.dataset as ds
>> import numpy as np
>> import gcsfs
>> import pyarrow.parquet as pq
>>
>> fs = gcsfs.GCSFileSystem()
>> data = pq.ParquetDataset("global-radiosondes/hires_sonde", filesystem=fs,
>> partitioning=["year", "month", "day", "hour",
>> "site"],
>> use_legacy_dataset=False,
>> filters=[
>> ('year', '=', '2022'),
>> ('month', '=', '01'),
>> ('day', '=', '09'),
>> ('hour', '=', '12')])
>> table = data.read(columns=["pres", "hght"])
>> df = table.to_pandas()
>> print(df)
>>
>> With the following output:
>> Empty DataFrame
>> Columns: [pres, hght]
>> Index: []
>>
>>
>> Am I applying this incorrectly somehow? Any help would be appreciated.
>> Again, the same issue happens when using the pyarrow.dataset API to load as
>> well. The data bucket is public, so feel free to experiment. If I load the
>> whole dataset into a pandas data frame, it works fine. Issue seems to be the
>> filtering.
>>
>> Thanks,
>> Kelton.
>