Hi!

I am aware of 3 approaches:

1. standard one
https://arrow.apache.org/docs/python/generated/pyarrow.dataset.write_dataset.html#pyarrow.dataset.write_dataset
though I believe I have used it only with parquet and not the newest version

2. partition manually - separate files with conventions, though
possibly 1 is so advanced now that you can do everything with it 1

3. use row groups in parquet files - then different threads/processes
can each read its own row group but you may have 1 big file.

Note that some of the functionalities might be sometimes missing in
certain environments - e.g. there is a filtering mechanism in Azure
blobs with parquet files
but it did not work for me if decimal columns were present so test if
it all works before committing.

Best Regards,

Jacek



pt., 7 lip 2023 o 14:55 Adrian Mowat <[email protected]> napisał(a):
>
> Hi Jacek,
>
> Yes, I would be able to re-write the extracts from Postgres.  Is there an 
> easy way to partition the results of a SQL query or would I need to write 
> something?
>
> Many thanks
>
> Adrian
>
> On Fri, 7 Jul 2023 at 12:53, Jacek Pliszka <[email protected]> wrote:
>>
>> Hi!
>>
>> If you have any influence over how data is dumped from postgres  - my
>> suggestion is to have it already partitioned then.
>>
>> This would make parallelization much easier,
>>
>> BR
>>
>> Jacek
>>
>> pt., 7 lip 2023 o 12:21 Adrian Mowat <[email protected]> napisał(a):
>> >
>> > Hi,
>> >
>> > TL;DR: newbie question.  I have an pyarrow program that uses 
>> > pyarrow.compute.diff to compare two 10M row tables.  In order to handle 
>> > the large volume, I partition my data into subsets by primary key and diff 
>> > each one individually.  It works but I don't think it performs as well as 
>> > it could and I'm wondering if there are better ways to solve this problem
>> >
>> > Full question:
>> >
>> > I'm new to Arrow and I'm running a proof of concept on using it to find 
>> > differences between 2 large data sets.  My use case is that we have a job 
>> > that dumps some postgres tables to S3 as JSON every night and I want to 
>> > run a batch job that compares one day's data and the next so we can send 
>> > the change sets to downstream systems in other parts of the company.  Some 
>> > of the tables have over 30M rows of data (and growing) so I need something 
>> > that is performant and can handle a large volume of data.
>> >
>> > For my proof of concept, I downloaded 10M rows of one of the tables from 1 
>> > day and then another 10M from the next day.  I then made separate "small" 
>> > (100,000 rows) and "medium" (1M rows) sized subsets for development.  If 
>> > this exercise is successful and we decide to go ahead with a project using 
>> > Arrow, it will probably be written in Ruby because that's the main 
>> > language at my company.  I used Python for the proof of concept because it 
>> > seems better documented and more widely used so it should be easier to 
>> > find documentation etc.
>> >
>> > My first attempt was to load the data into Arrow tables and the use 
>> > compute.diff to find the differences.  This worked find for the small and 
>> > medium sized data sets but when I ran it against the "large" (10M) row 
>> > data my program failed with an error:
>> >
>> > There are more than 2^32 bytes of key data.  Acero cannot process a join 
>> > of this magnitude
>> >
>> > This is obviously a problem so I decided to partition the data in subsets 
>> > based on the primary key like this:
>> >
>> > def partition_by_key(table, key, nways):
>> >     partitions = [[] for i in range(0, nways)]
>> >
>> >     # Slice up key values into nways partitions
>> >     for idx, val in enumerate(table.column(key)):
>> >         ptn = val.as_py() % nways
>> >         partitions[ptn].append(idx)
>> >
>> >     # Use the key partitions to create nways
>> >     # masks over the data
>> >     result = []
>> >     for idx, indexes in enumerate(partitions):
>> >         mask = [False for i in range(0, len(table))]
>> >         for i in indexes:
>> >             mask[i] = True
>> >         result.append(table.filter(mask))
>> >
>> >     return result
>> >
>> > And then diff each partition in turn like this
>> >
>> > def diff_partitions(lhs_parts, rhs_parts, primary_key, 
>> > find_changes_expression):
>> >     inserted = []
>> >     changed = []
>> >     deleted = []
>> >     for lhs, rhs in zip(lhs_parts, rhs_parts):
>> >         i, c, d = compute.diff(lhs, rhs, primary_key, 
>> > find_changes_expression)
>> >         inserted.append(i)
>> >         changed.append(c)
>> >         deleted.append(d)
>> >     return inserted, changed, deleted
>> >
>> > So the main part of my program reads:
>> >
>> > # utility function that loads all the JSONL
>> > # files in a directory into an Arrow table
>> > lhs = jsonl.load_jsonl(lhs_dir, schema)
>> > rhs = jsonl.load_jsonl(rhs_dir, schema)
>> >
>> > num_rows = max(len(lhs), len(rhs))
>> > nways = multiprocessing.cpu_count()
>> >
>> > lhs_parts = partition_by_key(lhs, primary_key, nways)
>> > rhs_parts = partition_by_key(rhs, primary_key, nways)
>> >
>> > # find_changes_expression is hard-coded elsewhere in the program
>> > inserted, changed, deleted = diff_partitions(lhs_parts, rhs_parts, 
>> > primary_key, find_changes_expression)
>> >
>> > This works but I have some questions:
>> >
>> > Have I used Arrow in the best possible way here?  Or are there better ways 
>> > to approach this problem?
>> >
>> > One thing I don't like is that am running compute.diff against each data 
>> > partition one after another.  I can see I have plenty of spare CPU 
>> > capacity when this code is running so I'd like to be able to diff all the 
>> > partitions concurrently.  Does Arrow provide any support for true 
>> > multithreading so I can get around Python's limited threading capabilities?
>> >
>> > In general, I've found the docs to be good at explaining how Arrow works 
>> > but I couldn't find much about how to "think in arrow" when solving 
>> > problems.  Is there anything out there I might want to look at?
>> >
>> > Many Thanks
>> >
>> > Adrian
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >

Reply via email to