As a quick update in case anyone runs across this, this has been fixed
upstream (9.0.0) by: https://github.com/apache/arrow/pull/12977

On Thu, Jul 21, 2022 at 3:18 PM Kirby, Adam <[email protected]> wrote:

> One more follow-up here. The addition of the below statement seems to coax
> out an error. Does it appear that the filenames aren't making their way to
> the routines that extract the fields from the filenames?
>
> FWIW, this error seems to be coming from here:
> https://github.com/apache/arrow/blob/6e3f26af658bfca602e711ea326f1985b62bca1d/cpp/src/arrow/dataset/partition.cc#L511
>
> partitioning = pds.FilenamePartitioning(schema=part_schema).discover(
> schema=part_schema)
> ds_partitioned = pds.dataset(
> csv_files, format=csvformat, filesystem=fsspec_fs, partitioning=
> partitioning,
> )
> # Traceback (most recent call last):
> # File "/zip_of_csvs_test.py", line 82, in <module>
> # ds_partitioned = pds.dataset(
> # File
> "/.pyenv/versions/3.8.2/lib/python3.8/site-packages/pyarrow/dataset.py",
> line 697, in dataset
> # return _filesystem_dataset(source, **kwargs)
> # File
> "/.pyenv/versions/3.8.2/lib/python3.8/site-packages/pyarrow/dataset.py",
> line 449, in _filesystem_dataset
> # return factory.finish(schema)
> # File "pyarrow/_dataset.pyx", line 1857, in
> pyarrow._dataset.DatasetFactory.finish
> # File "pyarrow/error.pxi", line 144, in
> pyarrow.lib.pyarrow_internal_check_status
> # File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
> # pyarrow.lib.ArrowInvalid: No non-null segments were available for field
> 'frequency'; couldn't infer type
>
>
>
> On Wed, Jul 20, 2022 at 11:19 PM Kirby, Adam <[email protected]> wrote:
>
>> As a follow-up, I can confirm that this appears to work very well for
>> non-partitioned data at least.
>> In my case, the data are ‘partitioned’ and while the rest of the data are
>> parsed properly, the partition fields don’t seem to be being extracted from
>> the filenames. Does it appear that I am doing something incorrectly?
>>
>> Thank you!
>>
>> —
>>
>> #!/usr/bin/env python3
>> import fsspecimport pyarrow as paimport pyarrow.csv as pcsvimport 
>> pyarrow.dataset as pds
>>
>> sample_file = (
>>     
>> "https://firstratedata.com/_data/_deploy/stocks-complete_bundle_sample.zip";
>> )
>> schema = pa.schema(
>>     [
>>         pa.field("datetime", pa.timestamp("s")),
>>         pa.field("open", pa.float64()),
>>         pa.field("high", pa.float64()),
>>         pa.field("low", pa.float64()),
>>         pa.field("close", pa.float64()),
>>         pa.field("volume", pa.float64()),
>>     ],
>> )
>> read_opts, convert_opts = pcsv.ReadOptions(), pcsv.ConvertOptions()
>> convert_opts.column_types = schema
>> read_opts.column_names = schema.names
>> csvformat = pds.CsvFileFormat(convert_options=convert_opts, 
>> read_options=read_opts)
>>
>> fsspec_fs = fsspec.filesystem("zip", fo=fsspec.open(sample_file))
>>
>> csv_files = [_ for _ in fsspec_fs.ls("/") if _.endswith("_sample.txt")]
>> print(csv_files)# ['AAPL_1hour_sample.txt', 'AAPL_1min_sample.txt', 
>> 'AAPL_30min_sample.txt',# 'AAPL_5min_sample.txt', 'AMZN_1hour_sample.txt', 
>> 'AMZN_1min_sample.txt',# 'AMZN_30min_sample.txt', 'AMZN_5min_sample.txt', 
>> 'MSFT_1hour_sample.txt',# 'MSFT_1min_sample.txt', 'MSFT_30min_sample.txt', 
>> 'MSFT_5min_sample.txt']
>>
>> part_schema = pa.schema([("symbol", pa.string()), ("frequency", 
>> pa.string())])
>> partitioning = pds.FilenamePartitioning(schema=part_schema)
>> # confirm filenames are parsed correctly
>> print({_: str(partitioning.parse(_)) for _ in csv_files})# {#     
>> "AAPL_1hour_sample.txt": '((symbol == "AAPL") and (frequency == "1hour"))',# 
>>     "AAPL_1min_sample.txt": '((symbol == "AAPL") and (frequency == 
>> "1min"))',#     "AAPL_30min_sample.txt": '((symbol == "AAPL") and (frequency 
>> == "30min"))',#     "AAPL_5min_sample.txt": '((symbol == "AAPL") and 
>> (frequency == "5min"))',#     "AMZN_1hour_sample.txt": '((symbol == "AMZN") 
>> and (frequency == "1hour"))',#     "AMZN_1min_sample.txt": '((symbol == 
>> "AMZN") and (frequency == "1min"))',#     "AMZN_30min_sample.txt": '((symbol 
>> == "AMZN") and (frequency == "30min"))',#     "AMZN_5min_sample.txt": 
>> '((symbol == "AMZN") and (frequency == "5min"))',#     
>> "MSFT_1hour_sample.txt": '((symbol == "MSFT") and (frequency == "1hour"))',# 
>>     "MSFT_1min_sample.txt": '((symbol == "MSFT") and (frequency == 
>> "1min"))',#     "MSFT_30min_sample.txt": '((symbol == "MSFT") and (frequency 
>> == "30min"))',#     "MSFT_5min_sample.txt": '((symbol == "MSFT") and 
>> (frequency == "5min"))',# }
>>
>> ds_partitioned = pds.dataset(
>>     csv_files, format=csvformat, filesystem=fsspec_fs, 
>> partitioning=partitioning,
>> )
>>
>> print(ds_partitioned.head(5))# pyarrow.Table# datetime: timestamp[s]# open: 
>> double# high: double# low: double# close: double# volume: double# symbol: 
>> string# frequency: string# ----# datetime: [[2022-04-01 04:00:00,2022-04-01 
>> 05:00:00,2022-04-01 06:00:00,2022-04-01 07:00:00,2022-04-01 08:00:00]]# 
>> open: [[175.25,175.32,175.43,175.54,175.49]]# high: 
>> [[175.88,175.38,175.72,175.6,175.52]]# low: 
>> [[175.1,175.04,175.33,174.69,173.35]]# close: 
>> [[175.26,175.31,175.5,174.82,173.6]]# volume: 
>> [[24417,13692,90057,162983,736016]]# symbol: [[null,null,null,null,null]]# 
>> frequency: [[null,null,null,null,null]]
>>
>>
>> On Wed, Jul 20, 2022 at 11:12 AM Kirby, Adam <[email protected]> wrote:
>>
>>> Micah, Great idea, thank you! I really appreciate the pointer.
>>>
>>> On Wed, Jul 20, 2022 at 12:04 AM Micah Kornfield <[email protected]>
>>> wrote:
>>>
>>>> You could maybe use datasets on top of fsspec's zip file system [1]?
>>>>
>>>> [1]
>>>> https://filesystem-spec.readthedocs.io/en/latest/_modules/fsspec/implementations/zip.html
>>>>
>>>> On Tuesday, July 19, 2022, Kirby, Adam <[email protected]> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I'm currently using pyarrow.csv.read_csv to parse a CSV stream that
>>>>> originates from a ZIP of multiple CSV files. For now, I'm using a separate
>>>>> implementation to do the streaming ZIP decompression, then
>>>>> using pyarrow.csv.read_csv at each CSV file boundary.
>>>>>
>>>>> I would love if there were a way to leverage pyarrow to handle the
>>>>> decompression. From what I've seen in examples, a ZIP file containing a
>>>>> single CSV is supported -- that is, it's possible to operate on a
>>>>> compressed CSV stream -- but I wonder if it's possible to handle a
>>>>> compressed stream that contains multiple files?
>>>>>
>>>>> Thank you in advance!
>>>>>
>>>>

Reply via email to