Hi Kelton,
I was looking into it a bit, and this seems to be some kind of bug in the
gcsfs package (or fsspec).
When looking at the dataset object that gets created with your initial
example, we can see:
>>> data.files
['global-radiosondes/hires-sonde']
So this indicates that for some reason, it is seeing this top-level
directory as the single file that makes up this partitioned dataset (the
original error message you showed also indicates it is trying to open that
path as if it was a file). Normally, this property should give a list of
all discovered files in the partitioned dataset.
The dataset discovery gets this information from the filesystem, and it
seems that this is behaving a bit strangely. Under the hood, it is calling
the `info()` method of an fsspec-like filesystem. If I do this manually, I
get a different result for the first vs subsequent call:
In [1]: import gcsfs
In [2]: fs = gcsfs.GCSFileSystem(token="anon")
In [3]: fs.info("global-radiosondes/hires-sonde")
Out[3]:
{'kind': 'storage#object',
'id': 'global-radiosondes/hires-sonde//1644725282206197',
'selfLink': '
https://www.googleapis.com/storage/v1/b/global-radiosondes/o/hires-sonde%2F
',
'mediaLink': '
https://storage.googleapis.com/download/storage/v1/b/global-radiosondes/o/hires-sonde%2F?generation=1644725282206197&alt=media
',
'name': 'global-radiosondes/hires-sonde/',
'bucket': 'global-radiosondes',
'generation': '1644725282206197',
'metageneration': '1',
'contentType': 'text/plain',
'storageClass': 'STANDARD',
'size': 0,
'md5Hash': '1B2M2Y8AsgTpgAmY7PhCfg==',
'crc32c': 'AAAAAA==',
'etag': 'CPXjy5Hn+/UCEAE=',
'temporaryHold': False,
'eventBasedHold': False,
'timeCreated': '2022-02-13T04:08:02.226Z',
'updated': '2022-02-13T04:08:02.226Z',
'timeStorageClassUpdated': '2022-02-13T04:08:02.226Z',
'type': 'file'}
In [4]: fs.info("global-radiosondes/hires-sonde")
Out[4]:
{'bucket': 'global-radiosondes',
'name': 'global-radiosondes/hires-sonde',
'size': 0,
'storageClass': 'DIRECTORY',
'type': 'directory'}
(to see this, you need to do this in a fresh python session, not after
already trying the ds.dataset(..); because that will already have called
that a first time)
So for this reason, the `ds.dataset(..)` discovery thinks it is dealing
with a single file, and thus subsequently reading data fails.
As a quick workaround, can you test doing this `fs.info
("global-radiosondes/hires-sonde")` call first before calling
`ds.dataset(..)`? Does it then work?
I would report this to https://github.com/fsspec/gcsfs, as that seems like
a bug in gcsfs or fsspec.
Best,
Joris
On Mon, 21 Feb 2022 at 19:32, Kelton Halbert <[email protected]> wrote:
> Hi Alenka,
>
> Here is the code snippet that loads a single Parquet file. I can also
> confirm that it appears to be with the function call “fs.isfile” on the
> root directory… calling this function myself returns False, as I would
> expect it should: fs.isfile("global-radiosondes/hires-sonde”)
>
> fs = gcsfs.GCSFileSystem(token="anon")
>
> partitioning = ds.HivePartitioning(
> pyarrow.schema([
> pyarrow.field('year', pyarrow.int32()),
> pyarrow.field('month', pyarrow.int32()),
> pyarrow.field('day', pyarrow.int32()),
> pyarrow.field('hour', pyarrow.int32()),
> pyarrow.field('WMO', pyarrow.string())
> ])
> )
>
> schema = pyarrow.schema([
> pyarrow.field('lon', pyarrow.float32()),
> pyarrow.field('lat', pyarrow.float32()),
> pyarrow.field('pres', pyarrow.float32()),
> pyarrow.field('hght', pyarrow.float32()),
> pyarrow.field('gpht', pyarrow.float32()),
> pyarrow.field('tmpc', pyarrow.float32()),
> pyarrow.field('dwpc', pyarrow.float32()),
> pyarrow.field('relh', pyarrow.float32()),
> pyarrow.field('uwin', pyarrow.float32()),
> pyarrow.field('vwin', pyarrow.float32()),
> pyarrow.field('wspd', pyarrow.float32()),
> pyarrow.field('wdir', pyarrow.float32()),
> pyarrow.field('year', pyarrow.int32()),
> pyarrow.field('month', pyarrow.int32()),
> pyarrow.field('day', pyarrow.int32()),
> pyarrow.field('hour', pyarrow.int32()),
> pyarrow.field('WMO', pyarrow.string())
> ])
>
> data =
> ds.dataset("global-radiosondes/hires-sonde/year=2016/month=5/day=24/hour=19/WMO=72451",
> filesystem=fs, format="parquet", \
> schema=schema, partitioning=partitioning)
>
> batches = data.to_batches(columns=["pres", "gpht", "hght", "tmpc", "wspd",
> "wdir"], \
> use_threads=True)
>
> batches = list(batches)
> print(batches[0].to_pandas().head())
>
> Kelton.
>
>
> On Feb 21, 2022, at 3:07 AM, Alenka Frim <[email protected]> wrote:
>
> Hi Kelton,
>
> I can reproduce the same error if I try to load all the data with data =
> ds.dataset("global-radiosondes/hires-sonde", filesystem=fs) or data =
> pq.ParquetDataset("global-radiosondes/hires-sonde", filesystem=fs,
> use_legacy_dataset=False).
>
> Could you share your code, where you read a specific parquet file?
>
> Best,
> Alenka
>
> On Mon, Feb 21, 2022 at 12:04 AM Kelton Halbert <[email protected]>
> wrote:
>
>> Hello,
>>
>> I’ve been learning and working with PyArrow recently for a project to
>> store some atmospheric science data as part of a partitioned dataset, and
>> recently the dataset class with the fsspec/gcsfs filesystem has started
>> producing a new error. Unfortunately I cannot seem to track down what
>> changed or if it’s an error on my end or not. I’m using PyArrow 7.0.0 and
>> python 3.8.
>>
>> If I specify a specific parquet file, everything is fine - but if I give
>> it any of the directory partitions, the same issue occurs. Any guidance
>> here would be appreciated!
>>
>> The code:
>> fs = gcsfs.GCSFileSystem(token="anon")
>>
>> partitioning = ds.HivePartitioning(
>> pyarrow.schema([
>> pyarrow.field('year', pyarrow.int32()),
>> pyarrow.field('month', pyarrow.int32()),
>> pyarrow.field('day', pyarrow.int32()),
>> pyarrow.field('hour', pyarrow.int32()),
>> pyarrow.field('WMO', pyarrow.string())
>> ])
>> )
>>
>> schema = pyarrow.schema([
>> pyarrow.field('lon', pyarrow.float32()),
>> pyarrow.field('lat', pyarrow.float32()),
>> pyarrow.field('pres', pyarrow.float32()),
>> pyarrow.field('hght', pyarrow.float32()),
>> pyarrow.field('gpht', pyarrow.float32()),
>> pyarrow.field('tmpc', pyarrow.float32()),
>> pyarrow.field('dwpc', pyarrow.float32()),
>> pyarrow.field('relh', pyarrow.float32()),
>> pyarrow.field('uwin', pyarrow.float32()),
>> pyarrow.field('vwin', pyarrow.float32()),
>> pyarrow.field('wspd', pyarrow.float32()),
>> pyarrow.field('wdir', pyarrow.float32()),
>> pyarrow.field('year', pyarrow.int32()),
>> pyarrow.field('month', pyarrow.int32()),
>> pyarrow.field('day', pyarrow.int32()),
>> pyarrow.field('hour', pyarrow.int32()),
>> pyarrow.field('WMO', pyarrow.string())
>> ])
>>
>> data = ds.dataset("global-radiosondes/hires-sonde", filesystem=fs,
>> format="parquet", \
>> partitioning=partitioning, schema=schema)
>>
>> subset = (ds.field("year") == 2016) & (ds.field("WMO") == "72451")
>>
>> batches = data.to_batches(columns=["pres", "gpht", "tmpc", "wspd",
>> "wdir", "year", "month", "day", "hour"], \
>> use_threads=True)
>>
>> batches = list(batches)
>>
>> The error:
>>
>> 391 from pyarrow import PythonFile 393 if not
>> self.fs.isfile(path):--> 394 raise FileNotFoundError(path) 396 return
>> PythonFile(self.fs.open(path, mode="rb"), mode="r")
>> FileNotFoundError: global-radiosondes/hires-sonde/
>>
>>
>>
>