Hi Weston,

many thanks for the complete example: it works like a charm!

The function `dataset.write_dataset` has a nice API, but I cannot figure
out how to use some arguments.

For example, it seems I should be able to change some parquet format
parameters with the `file_options` argument, by passing an
`ParquetFileFormat.make_write_options()` object.

But I cannot find which **kwargs are accepted by make_write_options. The
function `parquet.write_table` has several arguments for parquet. Are these
the same arguments I can pass to ParquetFileFormat.make_write_options()?

Thanks,
Antonio


On Wed, Dec 15, 2021 at 1:09 AM Weston Pace <[email protected]> wrote:

> You may be able to meet this use case using the tabular datasets[1]
> feature of pyarrow.  A few thoughts:
>
> 1. The easiest way to get an "append" workflow with
> pyarrow.dataset.write_dataset is to use a unique basename_template for
> each write_dataset operation.   A uuid is helpful here.
> 2. As you mentioned, if your writes generate a bunch of small files,
> you will want to periodically compact your partitions.
> 3. Reads should not happen at the same time as writes or else you risk
> reading partial / incomplete files.
>
> Example:
>
> import pyarrow as pa
> import pyarrow.dataset as ds
>
> import tempfile
> from glob import glob
> from uuid import uuid4
>
> tab1 = pa.Table.from_pydict({'partition': [1, 1, 2, 2], 'value': [1, 2, 3,
> 4]})
> tab2 = pa.Table.from_pydict({'partition': [1, 1, 2, 2], 'value': [5, 6, 7,
> 8]})
>
> with tempfile.TemporaryDirectory() as dataset_dir:
>     ds.write_dataset(tab1, dataset_dir, format='parquet',
>                      partitioning=['partition'],
>                      partitioning_flavor='hive',
>                      existing_data_behavior='overwrite_or_ignore',
>                      basename_template=f'{uuid4()}-{{i}}')
>     ds.write_dataset(tab2, dataset_dir, format='parquet',
>                      partitioning=['partition'],
>                      partitioning_flavor='hive',
>                      existing_data_behavior='overwrite_or_ignore',
>                      basename_template=f'{uuid4()}-{{i}}')
>
>     print('\n'.join(glob(f'{dataset_dir}/**/*')))
>
>     dataset = ds.dataset(dataset_dir)
>
>     print(dataset.to_table().to_pandas())
>
> [1] https://arrow.apache.org/docs/python/dataset.html
>
> On Tue, Dec 14, 2021 at 12:17 AM Antonino Ingargiola <[email protected]>
> wrote:
> >
> > Hi arrow community,
> >
> > I just subscribed to this mailing list. First, let me thank all the
> contributors for this great project!
> >
> > I have a question on which pyarrow API to use on a specific use-case. I
> need to update/append data to a large partitioned parquet dataset using
> pyarrow. I receive the data in small batches that are transformed in small
> pandas dataframes. All the new dataframes have the same schema. The data
> can be saved locally or on a cloud object store (s3).
> >
> > When I receive a new batch, I need to update the parquet dataset with
> the new rows in the pandas dataframe. Essentially, I need to save
> additional xyz.parquet files in the appropriate partition subfolders,
> without removing or overwriting pre-existing .parquet files in the same
> partition folder.
> >
> > My goal is ending up with a dataset like this:
> >
> > parquet_dataset/
> >     partition=1/
> >         a.parquet
> >         b.parquet
> >         c.parquet
> >     partition=2/
> >         a.parquet
> >         b.parquet
> >
> > where each individual parquet file contains a single batch of data
> (actually a single batch may be splitted in 2 or more partitions).
> >
> > Is there a preferred API to achieve this continuous update in pyarrow?
> >
> > I can implement all this logic manually, but, ideally, I would like to
> defer to pyarrow the task of splitting the input dataframe in partitions
> and saving each chunk in the appropriate subfolder, generating a filename
> that will not conflict with existing files. Is this possible with the
> current pyarrow?
> >
> > PS: I understand that this fragmentation is not ideal for
> reading/quering, but it allows to handle the update process quickly. And
> anyway, I periodically save a consolidated copy of the dataset with one
> file per partition to improve the read performance.
> >
> > Thanks in advance,
> > Antonio
> >
> >
> >
>

Reply via email to