As a quick update in case anyone runs across this, this has been fixed upstream (9.0.0) by: https://github.com/apache/arrow/pull/12977
On Thu, Jul 21, 2022 at 3:18 PM Kirby, Adam <[email protected]> wrote: > One more follow-up here. The addition of the below statement seems to coax > out an error. Does it appear that the filenames aren't making their way to > the routines that extract the fields from the filenames? > > FWIW, this error seems to be coming from here: > https://github.com/apache/arrow/blob/6e3f26af658bfca602e711ea326f1985b62bca1d/cpp/src/arrow/dataset/partition.cc#L511 > > partitioning = pds.FilenamePartitioning(schema=part_schema).discover( > schema=part_schema) > ds_partitioned = pds.dataset( > csv_files, format=csvformat, filesystem=fsspec_fs, partitioning= > partitioning, > ) > # Traceback (most recent call last): > # File "/zip_of_csvs_test.py", line 82, in <module> > # ds_partitioned = pds.dataset( > # File > "/.pyenv/versions/3.8.2/lib/python3.8/site-packages/pyarrow/dataset.py", > line 697, in dataset > # return _filesystem_dataset(source, **kwargs) > # File > "/.pyenv/versions/3.8.2/lib/python3.8/site-packages/pyarrow/dataset.py", > line 449, in _filesystem_dataset > # return factory.finish(schema) > # File "pyarrow/_dataset.pyx", line 1857, in > pyarrow._dataset.DatasetFactory.finish > # File "pyarrow/error.pxi", line 144, in > pyarrow.lib.pyarrow_internal_check_status > # File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status > # pyarrow.lib.ArrowInvalid: No non-null segments were available for field > 'frequency'; couldn't infer type > > > > On Wed, Jul 20, 2022 at 11:19 PM Kirby, Adam <[email protected]> wrote: > >> As a follow-up, I can confirm that this appears to work very well for >> non-partitioned data at least. >> In my case, the data are ‘partitioned’ and while the rest of the data are >> parsed properly, the partition fields don’t seem to be being extracted from >> the filenames. Does it appear that I am doing something incorrectly? >> >> Thank you! >> >> — >> >> #!/usr/bin/env python3 >> import fsspecimport pyarrow as paimport pyarrow.csv as pcsvimport >> pyarrow.dataset as pds >> >> sample_file = ( >> >> "https://firstratedata.com/_data/_deploy/stocks-complete_bundle_sample.zip" >> ) >> schema = pa.schema( >> [ >> pa.field("datetime", pa.timestamp("s")), >> pa.field("open", pa.float64()), >> pa.field("high", pa.float64()), >> pa.field("low", pa.float64()), >> pa.field("close", pa.float64()), >> pa.field("volume", pa.float64()), >> ], >> ) >> read_opts, convert_opts = pcsv.ReadOptions(), pcsv.ConvertOptions() >> convert_opts.column_types = schema >> read_opts.column_names = schema.names >> csvformat = pds.CsvFileFormat(convert_options=convert_opts, >> read_options=read_opts) >> >> fsspec_fs = fsspec.filesystem("zip", fo=fsspec.open(sample_file)) >> >> csv_files = [_ for _ in fsspec_fs.ls("/") if _.endswith("_sample.txt")] >> print(csv_files)# ['AAPL_1hour_sample.txt', 'AAPL_1min_sample.txt', >> 'AAPL_30min_sample.txt',# 'AAPL_5min_sample.txt', 'AMZN_1hour_sample.txt', >> 'AMZN_1min_sample.txt',# 'AMZN_30min_sample.txt', 'AMZN_5min_sample.txt', >> 'MSFT_1hour_sample.txt',# 'MSFT_1min_sample.txt', 'MSFT_30min_sample.txt', >> 'MSFT_5min_sample.txt'] >> >> part_schema = pa.schema([("symbol", pa.string()), ("frequency", >> pa.string())]) >> partitioning = pds.FilenamePartitioning(schema=part_schema) >> # confirm filenames are parsed correctly >> print({_: str(partitioning.parse(_)) for _ in csv_files})# {# >> "AAPL_1hour_sample.txt": '((symbol == "AAPL") and (frequency == "1hour"))',# >> "AAPL_1min_sample.txt": '((symbol == "AAPL") and (frequency == >> "1min"))',# "AAPL_30min_sample.txt": '((symbol == "AAPL") and (frequency >> == "30min"))',# "AAPL_5min_sample.txt": '((symbol == "AAPL") and >> (frequency == "5min"))',# "AMZN_1hour_sample.txt": '((symbol == "AMZN") >> and (frequency == "1hour"))',# "AMZN_1min_sample.txt": '((symbol == >> "AMZN") and (frequency == "1min"))',# "AMZN_30min_sample.txt": '((symbol >> == "AMZN") and (frequency == "30min"))',# "AMZN_5min_sample.txt": >> '((symbol == "AMZN") and (frequency == "5min"))',# >> "MSFT_1hour_sample.txt": '((symbol == "MSFT") and (frequency == "1hour"))',# >> "MSFT_1min_sample.txt": '((symbol == "MSFT") and (frequency == >> "1min"))',# "MSFT_30min_sample.txt": '((symbol == "MSFT") and (frequency >> == "30min"))',# "MSFT_5min_sample.txt": '((symbol == "MSFT") and >> (frequency == "5min"))',# } >> >> ds_partitioned = pds.dataset( >> csv_files, format=csvformat, filesystem=fsspec_fs, >> partitioning=partitioning, >> ) >> >> print(ds_partitioned.head(5))# pyarrow.Table# datetime: timestamp[s]# open: >> double# high: double# low: double# close: double# volume: double# symbol: >> string# frequency: string# ----# datetime: [[2022-04-01 04:00:00,2022-04-01 >> 05:00:00,2022-04-01 06:00:00,2022-04-01 07:00:00,2022-04-01 08:00:00]]# >> open: [[175.25,175.32,175.43,175.54,175.49]]# high: >> [[175.88,175.38,175.72,175.6,175.52]]# low: >> [[175.1,175.04,175.33,174.69,173.35]]# close: >> [[175.26,175.31,175.5,174.82,173.6]]# volume: >> [[24417,13692,90057,162983,736016]]# symbol: [[null,null,null,null,null]]# >> frequency: [[null,null,null,null,null]] >> >> >> On Wed, Jul 20, 2022 at 11:12 AM Kirby, Adam <[email protected]> wrote: >> >>> Micah, Great idea, thank you! I really appreciate the pointer. >>> >>> On Wed, Jul 20, 2022 at 12:04 AM Micah Kornfield <[email protected]> >>> wrote: >>> >>>> You could maybe use datasets on top of fsspec's zip file system [1]? >>>> >>>> [1] >>>> https://filesystem-spec.readthedocs.io/en/latest/_modules/fsspec/implementations/zip.html >>>> >>>> On Tuesday, July 19, 2022, Kirby, Adam <[email protected]> wrote: >>>> >>>>> Hi All, >>>>> >>>>> I'm currently using pyarrow.csv.read_csv to parse a CSV stream that >>>>> originates from a ZIP of multiple CSV files. For now, I'm using a separate >>>>> implementation to do the streaming ZIP decompression, then >>>>> using pyarrow.csv.read_csv at each CSV file boundary. >>>>> >>>>> I would love if there were a way to leverage pyarrow to handle the >>>>> decompression. From what I've seen in examples, a ZIP file containing a >>>>> single CSV is supported -- that is, it's possible to operate on a >>>>> compressed CSV stream -- but I wonder if it's possible to handle a >>>>> compressed stream that contains multiple files? >>>>> >>>>> Thank you in advance! >>>>> >>>>
