Hi Antonio, Did you try using `pyarrow.compute` options? Inside that batch iterator loop you can call the compute mean function and then call the add_column method for record batches. In the latest arrow code base might have support for 'projection', that could do this without having to iterate through record batches. @Weston Pace <[email protected]> WDYT?
On Mon, Jan 24, 2022 at 7:39 AM Antonino Ingargiola <[email protected]> wrote: > Hi list, > > I am looking for a way to add a new column to an existing table that is > computed as the sum/mean of other columns. From the docs, I understand > that pyarrow compute functions operate on arrays (i.e. columns) but I > cannot find if it is possible to aggregate through columns in some way. > > In addition, I use a scanner API to load and re-save a parquet dataset for > consolidating the small files in each partition, following the example from > the docs[1]. But, now I would like to extend the consolidation step adding > new columns before saving the new dataset. > > Here is an example of what I want to achieve. In this case I am using > pandas and scanner.to_batches(): > > import pyarrow.dataset as ds > import pyarrow as pa > from glob import glob > from uuid import uuid4 > > # Create the input dataset > data_dict = {'partition': [1, 1, 2, 2], > 'a': [1, 2, 3, 4], > 'b': [2, 4, 6, 8]} > table = pa.Table.from_pydict(data_dict) > > in_arrow_path = 'example_input' > out_arrow_path = 'example_output' > > ds.write_dataset(table, in_arrow_path, format='parquet', > partitioning=['partition'], > partitioning_flavor='hive', > existing_data_behavior='delete_matching', > basename_template=f'{uuid4()}-{{i}}') > > print('\n'.join(glob(f'{in_arrow_path}/**/*'))) > dataset = ds.dataset(in_arrow_path, partitioning='hive') > print(dataset.to_table().to_pandas()) > > # Re-save the input dataset adding a new column ("consolidation") > scanner = dataset.scanner() > cols = ['a', 'b'] > > for batch in scanner.to_batches(): > df = batch.to_pandas() > df['mean'] = df[cols].mean(axis=1) > new_batch = pa.RecordBatch.from_pandas(df) > ds.write_dataset( > new_batch, out_arrow_path, > format='parquet', > partitioning=['partition'], > partitioning_flavor='hive', > existing_data_behavior='delete_matching', > basename_template=f'{uuid4()}-{{i}}.parquet' > ) > > print('\n'.join(glob(f'{out_arrow_path}/**/*'))) > new_dataset = ds.dataset(out_arrow_path, partitioning='hive') > print(new_dataset.to_table().to_pandas()) > > So the questions are: > > 1. Can I add the new column (mean) directly in pyarrow? > 2. Do I need to write a loop batch by batch as above or is there a way to > apply the transformation through the scanner API like in [1]? > > Thanks for any advice. > > Antonio > > [1] > https://arrow.apache.org/docs/python/dataset.html#writing-large-amounts-of-data > > -- Niranda Perera https://niranda.dev/ @n1r44 <https://twitter.com/N1R44>
