Hi list,
I am looking for a way to add a new column to an existing table that is
computed as the sum/mean of other columns. From the docs, I understand
that pyarrow compute functions operate on arrays (i.e. columns) but I
cannot find if it is possible to aggregate through columns in some way.
In addition, I use a scanner API to load and re-save a parquet dataset for
consolidating the small files in each partition, following the example from
the docs[1]. But, now I would like to extend the consolidation step adding
new columns before saving the new dataset.
Here is an example of what I want to achieve. In this case I am using
pandas and scanner.to_batches():
import pyarrow.dataset as ds
import pyarrow as pa
from glob import glob
from uuid import uuid4
# Create the input dataset
data_dict = {'partition': [1, 1, 2, 2],
'a': [1, 2, 3, 4],
'b': [2, 4, 6, 8]}
table = pa.Table.from_pydict(data_dict)
in_arrow_path = 'example_input'
out_arrow_path = 'example_output'
ds.write_dataset(table, in_arrow_path, format='parquet',
partitioning=['partition'],
partitioning_flavor='hive',
existing_data_behavior='delete_matching',
basename_template=f'{uuid4()}-{{i}}')
print('\n'.join(glob(f'{in_arrow_path}/**/*')))
dataset = ds.dataset(in_arrow_path, partitioning='hive')
print(dataset.to_table().to_pandas())
# Re-save the input dataset adding a new column ("consolidation")
scanner = dataset.scanner()
cols = ['a', 'b']
for batch in scanner.to_batches():
df = batch.to_pandas()
df['mean'] = df[cols].mean(axis=1)
new_batch = pa.RecordBatch.from_pandas(df)
ds.write_dataset(
new_batch, out_arrow_path,
format='parquet',
partitioning=['partition'],
partitioning_flavor='hive',
existing_data_behavior='delete_matching',
basename_template=f'{uuid4()}-{{i}}.parquet'
)
print('\n'.join(glob(f'{out_arrow_path}/**/*')))
new_dataset = ds.dataset(out_arrow_path, partitioning='hive')
print(new_dataset.to_table().to_pandas())
So the questions are:
1. Can I add the new column (mean) directly in pyarrow?
2. Do I need to write a loop batch by batch as above or is there a way to
apply the transformation through the scanner API like in [1]?
Thanks for any advice.
Antonio
[1]
https://arrow.apache.org/docs/python/dataset.html#writing-large-amounts-of-data