Hi Sasha,

So far we have two use scenarios that may need the intermediate status of 
aggregate kernels during its consumption:
1) a shuffle-free single stage distributed query engine. We have our data 
partitioned and stored in multiple nodes, and would like to create a query plan 
with multiple fragments and retrieves partitioned data from all these nodes in 
parallel for better performance. Data shuffling is non trivial to implement for 
us, and we are looking for an approach that is simpler to implement. For 
aggregation query, one way to do it seems to: split the aggregation into 
pre-aggregation and finalize/combine two steps. For pre-aggregation, the 
aggregation operator only consumes the data and stores the intermediate results 
internally. For the `finalize/combine` step, it combines multiple partitioned 
intermediate results as the final result.

2) a materialized view that stores the intermediate status for aggregation, so 
that partial aggregated results (the intermediate status of aggregation) is 
stored in materialized view on disk, it will be faster when reading the 
materialized view since only the `finalize` computation is needed to get the 
results

Although for some aggregation kernel such as `avg`, we could use two existing 
aggregate kernels (sum/count) to manually maintain such intermediate status, 
but it requires developers to understand how these kernels are implemented 
internally, which is probably not easy and new aggregate kernels may be added 
in the future.

Since acero's aggregate kernels already maintain such intermediate status 
internally, I wonder if it is possible to have some APIs in aggregate kernels 
to retrieve these intermediate status to enable such use scenarios. Thanks.

Jiangtao

From: Sasha Krassovsky <[email protected]>
Date: Friday, July 7, 2023 at 2:21 PM
To: [email protected] <[email protected]>
Subject: Re: [C++][Acero] can Acero support distributed computation?
Yes, what you’ve said is correct for Mean. But my point earlier is that there 
should only be a few of such special cases. A simple case would be e.g. Max, 
where Aggregate outputs Max and then merge outputs Max(Max).

Sasha


6 июля 2023 г., в 23:13, Jiangtao Peng <[email protected]> написал(а):

Sorry for my unclear expression.

Take mean aggregation as example, does Aggregate "output" sum and count value, 
and Accumulate will "input" sum and count value, then "merge" 
sum(sum)/sum(count) as "output"?
My point is how to implement Pre-Aggregation and Post-Aggregation using Acero.

Best,
Jiangtao


From: Sasha Krassovsky <[email protected]>
Date: Friday, July 7, 2023 at 1:25 PM
To: [email protected] <[email protected]>
Subject: Re: [C++][Acero] can Acero support distributed computation?
Can you clarify what you mean by “data flow”? Each machine will be executing 
the same query plan. The query plan will contain an operator called Shuffle, 
and above the Shuffle will be an Aggregate, and above that will be an 
Accumulate node. The SourceNode will read data from disk on each machine. The 
data will then be put through the ShuffleNode, partitioned, and sent to other 
machines. The other machines will perform their local aggregations until they 
all agree that input is finished. Next the data on each machine will be input 
into the Accumulate node, which ships it to the master. The master will then 
perform the merge on all of the data it receives from all the participating 
machines.

Sasha



6 июля 2023 г., в 22:17, Jiangtao Peng <[email protected]> написал(а):

Sure, distributed aggregations can be split into “compute” and “merge” two 
phase. But how about data flow of “compute” and “merge” on different nodes?


Best,
Jiangtao

From: Sasha Krassovsky <[email protected]>
Date: Friday, July 7, 2023 at 11:07 AM
To: [email protected] <[email protected]>
Subject: Re: [C++][Acero] can Acero support distributed computation?
Distributed aggregations have two phases: “compute” (each node does its own 
aggregation) and “merge” (the master merged the partial results). For most 
aggregates (e.g. Sum, Min, Max), merge is just the same as compute, except over 
the partial results. However, for Mean in particular, the compute phase will 
actually be sum and count separately, and the merge phase will be sum of 
partial sums divided by sum of partial counts. Count(distinct) will be merged 
by Sum as well. There may be other special cases I’m not thinking of, that’s 
the main idea.

Sasha

6 июля 2023 г., в 19:29, Jiangtao Peng <[email protected]> написал(а):

Hi Sasha,

Thanks for your reply!

Maybe Shuffle node is enough for data distribution. How about aggregation node? 
For example, mean kernel with group will maintain sum and count value for each 
group. Mater node merging mean result needs sum and count value on each 
partition. But mean kernel seems not support method to export such sum and 
count value, also, mean kernel doesn't support method to load these sum and 
count value for additional merge. If it is feasible to provide export/load 
method on aggregation kernel? Any other tips would be appreciated.

Thanks,
Jiangtao

From: Sasha Krassovsky <[email protected]>
Date: Friday, July 7, 2023 at 10:12 AM
To: [email protected] <[email protected]>
Subject: Re: [C++][Acero] can Acero support distributed computation?
Hi Jiangtao,
Acero doesn’t support any distributed computation on its own. However, to get 
some simple distributed computation going it would be sufficient to add a 
Shuffle node. For example for Aggregation, the Shuffle would assign a range of 
hashes to each node, and then each node would hash-partition its batches 
locally and send each partition to be aggregated on the corresponding nodes. 
You’d also need a master node to merge the results afterwards.

In general the Shuffle-by-hash scheme works well for relational queries where 
order doesn’t matter, but the time series functionality (i.e. as-of-join) 
wouldn’t work as well.

Hope this helps!
Sasha Krassovsky





6 июля 2023 г., в 19:04, Jiangtao Peng <[email protected]> написал(а):

Hi there,

I'm learning Acero streaming execution engine recently. And I’m wondering if 
Acero support distributed computing.

I have read code about aggregation node and kernel; Aggregation kernel seems to 
hide the details of aggregation middle state. If use multiple nodes with Acero 
execution engine, how to split aggregation tasks?

If current execution engine does not support distributed computing, taking 
aggregation as an example, how would you plan to transform the aggregate kernel 
to support distributed computation?

Any help or tips would be appreciated.

Thanks,
Jiangtao

Reply via email to