As a follow-up, I can confirm that this appears to work very well for
non-partitioned data at least.
In my case, the data are ‘partitioned’ and while the rest of the data are
parsed properly, the partition fields don’t seem to be being extracted from
the filenames. Does it appear that I am doing something incorrectly?
Thank you!
—
#!/usr/bin/env python3
import fsspecimport pyarrow as paimport pyarrow.csv as pcsvimport
pyarrow.dataset as pds
sample_file = (
"https://firstratedata.com/_data/_deploy/stocks-complete_bundle_sample.zip"
)
schema = pa.schema(
[
pa.field("datetime", pa.timestamp("s")),
pa.field("open", pa.float64()),
pa.field("high", pa.float64()),
pa.field("low", pa.float64()),
pa.field("close", pa.float64()),
pa.field("volume", pa.float64()),
],
)
read_opts, convert_opts = pcsv.ReadOptions(), pcsv.ConvertOptions()
convert_opts.column_types = schema
read_opts.column_names = schema.names
csvformat = pds.CsvFileFormat(convert_options=convert_opts,
read_options=read_opts)
fsspec_fs = fsspec.filesystem("zip", fo=fsspec.open(sample_file))
csv_files = [_ for _ in fsspec_fs.ls("/") if _.endswith("_sample.txt")]
print(csv_files)# ['AAPL_1hour_sample.txt', 'AAPL_1min_sample.txt',
'AAPL_30min_sample.txt',# 'AAPL_5min_sample.txt',
'AMZN_1hour_sample.txt', 'AMZN_1min_sample.txt',#
'AMZN_30min_sample.txt', 'AMZN_5min_sample.txt',
'MSFT_1hour_sample.txt',# 'MSFT_1min_sample.txt',
'MSFT_30min_sample.txt', 'MSFT_5min_sample.txt']
part_schema = pa.schema([("symbol", pa.string()), ("frequency", pa.string())])
partitioning = pds.FilenamePartitioning(schema=part_schema)
# confirm filenames are parsed correctly
print({_: str(partitioning.parse(_)) for _ in csv_files})# {#
"AAPL_1hour_sample.txt": '((symbol == "AAPL") and (frequency ==
"1hour"))',# "AAPL_1min_sample.txt": '((symbol == "AAPL") and
(frequency == "1min"))',# "AAPL_30min_sample.txt": '((symbol ==
"AAPL") and (frequency == "30min"))',# "AAPL_5min_sample.txt":
'((symbol == "AAPL") and (frequency == "5min"))',#
"AMZN_1hour_sample.txt": '((symbol == "AMZN") and (frequency ==
"1hour"))',# "AMZN_1min_sample.txt": '((symbol == "AMZN") and
(frequency == "1min"))',# "AMZN_30min_sample.txt": '((symbol ==
"AMZN") and (frequency == "30min"))',# "AMZN_5min_sample.txt":
'((symbol == "AMZN") and (frequency == "5min"))',#
"MSFT_1hour_sample.txt": '((symbol == "MSFT") and (frequency ==
"1hour"))',# "MSFT_1min_sample.txt": '((symbol == "MSFT") and
(frequency == "1min"))',# "MSFT_30min_sample.txt": '((symbol ==
"MSFT") and (frequency == "30min"))',# "MSFT_5min_sample.txt":
'((symbol == "MSFT") and (frequency == "5min"))',# }
ds_partitioned = pds.dataset(
csv_files, format=csvformat, filesystem=fsspec_fs,
partitioning=partitioning,
)
print(ds_partitioned.head(5))# pyarrow.Table# datetime: timestamp[s]#
open: double# high: double# low: double# close: double# volume:
double# symbol: string# frequency: string# ----# datetime:
[[2022-04-01 04:00:00,2022-04-01 05:00:00,2022-04-01
06:00:00,2022-04-01 07:00:00,2022-04-01 08:00:00]]# open:
[[175.25,175.32,175.43,175.54,175.49]]# high:
[[175.88,175.38,175.72,175.6,175.52]]# low:
[[175.1,175.04,175.33,174.69,173.35]]# close:
[[175.26,175.31,175.5,174.82,173.6]]# volume:
[[24417,13692,90057,162983,736016]]# symbol:
[[null,null,null,null,null]]# frequency: [[null,null,null,null,null]]
On Wed, Jul 20, 2022 at 11:12 AM Kirby, Adam <[email protected]> wrote:
> Micah, Great idea, thank you! I really appreciate the pointer.
>
> On Wed, Jul 20, 2022 at 12:04 AM Micah Kornfield <[email protected]>
> wrote:
>
>> You could maybe use datasets on top of fsspec's zip file system [1]?
>>
>> [1]
>> https://filesystem-spec.readthedocs.io/en/latest/_modules/fsspec/implementations/zip.html
>>
>> On Tuesday, July 19, 2022, Kirby, Adam <[email protected]> wrote:
>>
>>> Hi All,
>>>
>>> I'm currently using pyarrow.csv.read_csv to parse a CSV stream that
>>> originates from a ZIP of multiple CSV files. For now, I'm using a separate
>>> implementation to do the streaming ZIP decompression, then
>>> using pyarrow.csv.read_csv at each CSV file boundary.
>>>
>>> I would love if there were a way to leverage pyarrow to handle the
>>> decompression. From what I've seen in examples, a ZIP file containing a
>>> single CSV is supported -- that is, it's possible to operate on a
>>> compressed CSV stream -- but I wonder if it's possible to handle a
>>> compressed stream that contains multiple files?
>>>
>>> Thank you in advance!
>>>
>>
#!/usr/bin/env python3
import fsspec
import pyarrow as pa
import pyarrow.csv as pcsv
import pyarrow.dataset as pds
sample_file = (
"https://firstratedata.com/_data/_deploy/stocks-complete_bundle_sample.zip"
)
schema = pa.schema(
[
pa.field("datetime", pa.timestamp("s")),
pa.field("open", pa.float64()),
pa.field("high", pa.float64()),
pa.field("low", pa.float64()),
pa.field("close", pa.float64()),
pa.field("volume", pa.float64()),
],
)
read_opts, convert_opts = pcsv.ReadOptions(), pcsv.ConvertOptions()
convert_opts.column_types = schema
read_opts.column_names = schema.names
csvformat = pds.CsvFileFormat(convert_options=convert_opts, read_options=read_opts)
fsspec_fs = fsspec.filesystem("zip", fo=fsspec.open(sample_file))
csv_files = [_ for _ in fsspec_fs.ls("/") if _.endswith("_sample.txt")]
print(csv_files)
# ['AAPL_1hour_sample.txt', 'AAPL_1min_sample.txt', 'AAPL_30min_sample.txt',
# 'AAPL_5min_sample.txt', 'AMZN_1hour_sample.txt', 'AMZN_1min_sample.txt',
# 'AMZN_30min_sample.txt', 'AMZN_5min_sample.txt', 'MSFT_1hour_sample.txt',
# 'MSFT_1min_sample.txt', 'MSFT_30min_sample.txt', 'MSFT_5min_sample.txt']
part_schema = pa.schema([("symbol", pa.string()), ("frequency", pa.string())])
partitioning = pds.FilenamePartitioning(schema=part_schema)
# confirm filenames are parsed correctly
print({_: str(partitioning.parse(_)) for _ in csv_files})
# {
# "AAPL_1hour_sample.txt": '((symbol == "AAPL") and (frequency == "1hour"))',
# "AAPL_1min_sample.txt": '((symbol == "AAPL") and (frequency == "1min"))',
# "AAPL_30min_sample.txt": '((symbol == "AAPL") and (frequency == "30min"))',
# "AAPL_5min_sample.txt": '((symbol == "AAPL") and (frequency == "5min"))',
# "AMZN_1hour_sample.txt": '((symbol == "AMZN") and (frequency == "1hour"))',
# "AMZN_1min_sample.txt": '((symbol == "AMZN") and (frequency == "1min"))',
# "AMZN_30min_sample.txt": '((symbol == "AMZN") and (frequency == "30min"))',
# "AMZN_5min_sample.txt": '((symbol == "AMZN") and (frequency == "5min"))',
# "MSFT_1hour_sample.txt": '((symbol == "MSFT") and (frequency == "1hour"))',
# "MSFT_1min_sample.txt": '((symbol == "MSFT") and (frequency == "1min"))',
# "MSFT_30min_sample.txt": '((symbol == "MSFT") and (frequency == "30min"))',
# "MSFT_5min_sample.txt": '((symbol == "MSFT") and (frequency == "5min"))',
# }
ds_partitioned = pds.dataset(
csv_files, format=csvformat, filesystem=fsspec_fs, partitioning=partitioning,
)
print(ds_partitioned.head(5))
# pyarrow.Table
# datetime: timestamp[s]
# open: double
# high: double
# low: double
# close: double
# volume: double
# symbol: string
# frequency: string
# ----
# datetime: [[2022-04-01 04:00:00,2022-04-01 05:00:00,2022-04-01 06:00:00,2022-04-01 07:00:00,2022-04-01 08:00:00]]
# open: [[175.25,175.32,175.43,175.54,175.49]]
# high: [[175.88,175.38,175.72,175.6,175.52]]
# low: [[175.1,175.04,175.33,174.69,173.35]]
# close: [[175.26,175.31,175.5,174.82,173.6]]
# volume: [[24417,13692,90057,162983,736016]]
# symbol: [[null,null,null,null,null]]
# frequency: [[null,null,null,null,null]]