Arrow is a great in-memory representation to use for this kind of problem.
However, keep in mind that Arrow, the format, does not actually include
compute capabilities.  Much of your question is asking about things that
would normally be considered "compute operations" and the capabilities and
techniques are going to differ depending on which tools you are using (even
if all of the tools are using Arrow as the format).

> Have I used Arrow in the best possible way here?  Or are there better
ways to approach this problem?

This seems like a pretty reasonable approach.

>    for idx, val in enumerate(table.column(key)):
>        ptn = val.as_py() % nways
>        partitions[ptn].append(idx)

For performance, you almost never want to do any kind of "per-row"
operation in python.  Ideally that will always be done using efficient
compute functions.  Regrettably, there is not yet a compute function for
modulo (for some reason this particular function seems to be cursed :)  If
you're ok with requiring that nways be a power of two you can use
bit_wise_and.

mask = nways - 1
partitions = pc.bit_wise_and(table.column(key), mask)

> result = []
> for idx, indexes in enumerate(partitions):
>     mask = [False for i in range(0, len(table))]
>     for i in indexes:
>         mask[i] = True
>     result.append(table.filter(mask))

Same thing here.  This for-loop is per-row and you want to try and avoid
that.  For example...

result = []
for ptn in nways:
  mask = pc.equal(partitions, ptn)
  result.append(table.filter(mask))

I don't know what compute.diff is.  So I don't know if it's implemented
efficiently or in python or not.

> I can see I have plenty of spare CPU capacity when this code is running
so I'd
> like to be able to diff all the partitions concurrently.  Does Arrow
provide any
> support for true multithreading so I can get around Python's limited
threading capabilities?

Yes, through Acero, but "Acero cannot process a join of this magnitude"
means that you probably cannot use this across partitions.  That being
said, all Arrow compute functions release the GIL when they are running.
So you should be able to use python threading and still get multithreading
benefits if you follow my above advice and move the critical sections into
compute functions.



On Fri, Jul 7, 2023 at 5:55 AM Adrian Mowat <[email protected]> wrote:

> Hi Jacek,
>
> Yes, I would be able to re-write the extracts from Postgres.  Is there an
> easy way to partition the results of a SQL query or would I need to write
> something?
>
> Many thanks
>
> Adrian
>
> On Fri, 7 Jul 2023 at 12:53, Jacek Pliszka <[email protected]>
> wrote:
>
>> Hi!
>>
>> If you have any influence over how data is dumped from postgres  - my
>> suggestion is to have it already partitioned then.
>>
>> This would make parallelization much easier,
>>
>> BR
>>
>> Jacek
>>
>> pt., 7 lip 2023 o 12:21 Adrian Mowat <[email protected]> napisał(a):
>> >
>> > Hi,
>> >
>> > TL;DR: newbie question.  I have an pyarrow program that uses
>> pyarrow.compute.diff to compare two 10M row tables.  In order to handle the
>> large volume, I partition my data into subsets by primary key and diff each
>> one individually.  It works but I don't think it performs as well as it
>> could and I'm wondering if there are better ways to solve this problem
>> >
>> > Full question:
>> >
>> > I'm new to Arrow and I'm running a proof of concept on using it to find
>> differences between 2 large data sets.  My use case is that we have a job
>> that dumps some postgres tables to S3 as JSON every night and I want to run
>> a batch job that compares one day's data and the next so we can send the
>> change sets to downstream systems in other parts of the company.  Some of
>> the tables have over 30M rows of data (and growing) so I need something
>> that is performant and can handle a large volume of data.
>> >
>> > For my proof of concept, I downloaded 10M rows of one of the tables
>> from 1 day and then another 10M from the next day.  I then made separate
>> "small" (100,000 rows) and "medium" (1M rows) sized subsets for
>> development.  If this exercise is successful and we decide to go ahead with
>> a project using Arrow, it will probably be written in Ruby because that's
>> the main language at my company.  I used Python for the proof of concept
>> because it seems better documented and more widely used so it should be
>> easier to find documentation etc.
>> >
>> > My first attempt was to load the data into Arrow tables and the use
>> compute.diff to find the differences.  This worked find for the small and
>> medium sized data sets but when I ran it against the "large" (10M) row data
>> my program failed with an error:
>> >
>> > There are more than 2^32 bytes of key data.  Acero cannot process a
>> join of this magnitude
>> >
>> > This is obviously a problem so I decided to partition the data in
>> subsets based on the primary key like this:
>> >
>> > def partition_by_key(table, key, nways):
>> >     partitions = [[] for i in range(0, nways)]
>> >
>> >     # Slice up key values into nways partitions
>> >     for idx, val in enumerate(table.column(key)):
>> >         ptn = val.as_py() % nways
>> >         partitions[ptn].append(idx)
>> >
>> >     # Use the key partitions to create nways
>> >     # masks over the data
>> >     result = []
>> >     for idx, indexes in enumerate(partitions):
>> >         mask = [False for i in range(0, len(table))]
>> >         for i in indexes:
>> >             mask[i] = True
>> >         result.append(table.filter(mask))
>> >
>> >     return result
>> >
>> > And then diff each partition in turn like this
>> >
>> > def diff_partitions(lhs_parts, rhs_parts, primary_key,
>> find_changes_expression):
>> >     inserted = []
>> >     changed = []
>> >     deleted = []
>> >     for lhs, rhs in zip(lhs_parts, rhs_parts):
>> >         i, c, d = compute.diff(lhs, rhs, primary_key,
>> find_changes_expression)
>> >         inserted.append(i)
>> >         changed.append(c)
>> >         deleted.append(d)
>> >     return inserted, changed, deleted
>> >
>> > So the main part of my program reads:
>> >
>> > # utility function that loads all the JSONL
>> > # files in a directory into an Arrow table
>> > lhs = jsonl.load_jsonl(lhs_dir, schema)
>> > rhs = jsonl.load_jsonl(rhs_dir, schema)
>> >
>> > num_rows = max(len(lhs), len(rhs))
>> > nways = multiprocessing.cpu_count()
>> >
>> > lhs_parts = partition_by_key(lhs, primary_key, nways)
>> > rhs_parts = partition_by_key(rhs, primary_key, nways)
>> >
>> > # find_changes_expression is hard-coded elsewhere in the program
>> > inserted, changed, deleted = diff_partitions(lhs_parts, rhs_parts,
>> primary_key, find_changes_expression)
>> >
>> > This works but I have some questions:
>> >
>> > Have I used Arrow in the best possible way here?  Or are there better
>> ways to approach this problem?
>> >
>> > One thing I don't like is that am running compute.diff against each
>> data partition one after another.  I can see I have plenty of spare CPU
>> capacity when this code is running so I'd like to be able to diff all the
>> partitions concurrently.  Does Arrow provide any support for true
>> multithreading so I can get around Python's limited threading capabilities?
>> >
>> > In general, I've found the docs to be good at explaining how Arrow
>> works but I couldn't find much about how to "think in arrow" when solving
>> problems.  Is there anything out there I might want to look at?
>> >
>> > Many Thanks
>> >
>> > Adrian
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>>
>

Reply via email to