Can you clarify what you mean by “data flow”? Each machine will be executing 
the same query plan. The query plan will contain an operator called Shuffle, 
and above the Shuffle will be an Aggregate, and above that will be an 
Accumulate node. The SourceNode will read data from disk on each machine. The 
data will then be put through the ShuffleNode, partitioned, and sent to other 
machines. The other machines will perform their local aggregations until they 
all agree that input is finished. Next the data on each machine will be input 
into the Accumulate node, which ships it to the master. The master will then 
perform the merge on all of the data it receives from all the participating 
machines. 

Sasha 

> 6 июля 2023 г., в 22:17, Jiangtao Peng <[email protected]> написал(а):
> 
> 
> Sure, distributed aggregations can be split into “compute” and “merge” two 
> phase. But how about data flow of “compute” and “merge” on different nodes?
>  
>  
> Best,
> Jiangtao
>  
> From: Sasha Krassovsky <[email protected]>
> Date: Friday, July 7, 2023 at 11:07 AM
> To: [email protected] <[email protected]>
> Subject: Re: [C++][Acero] can Acero support distributed computation?
> 
> Distributed aggregations have two phases: “compute” (each node does its own 
> aggregation) and “merge” (the master merged the partial results). For most 
> aggregates (e.g. Sum, Min, Max), merge is just the same as compute, except 
> over the partial results. However, for Mean in particular, the compute phase 
> will actually be sum and count separately, and the merge phase will be sum of 
> partial sums divided by sum of partial counts. Count(distinct) will be merged 
> by Sum as well. There may be other special cases I’m not thinking of, that’s 
> the main idea. 
>  
> Sasha 
>  
> 6 июля 2023 г., в 19:29, Jiangtao Peng <[email protected]> написал(а):
> 
> 
> Hi Sasha,
>  
> Thanks for your reply!
>  
> Maybe Shuffle node is enough for data distribution. How about aggregation 
> node? For example, mean kernel with group will maintain sum and count value 
> for each group. Mater node merging mean result needs sum and count value on 
> each partition. But mean kernel seems not support method to export such sum 
> and count value, also, mean kernel doesn't support method to load these sum 
> and count value for additional merge. If it is feasible to provide 
> export/load method on aggregation kernel? Any other tips would be appreciated.
>  
> Thanks,
> Jiangtao
>  
> From: Sasha Krassovsky <[email protected]>
> Date: Friday, July 7, 2023 at 10:12 AM
> To: [email protected] <[email protected]>
> Subject: Re: [C++][Acero] can Acero support distributed computation?
> 
> Hi Jiangtao,
> Acero doesn’t support any distributed computation on its own. However, to get 
> some simple distributed computation going it would be sufficient to add a 
> Shuffle node. For example for Aggregation, the Shuffle would assign a range 
> of hashes to each node, and then each node would hash-partition its batches 
> locally and send each partition to be aggregated on the corresponding nodes. 
> You’d also need a master node to merge the results afterwards. 
>  
> In general the Shuffle-by-hash scheme works well for relational queries where 
> order doesn’t matter, but the time series functionality (i.e. as-of-join) 
> wouldn’t work as well. 
>  
> Hope this helps! 
> Sasha Krassovsky 
> 
> 
> 
> 6 июля 2023 г., в 19:04, Jiangtao Peng <[email protected]> написал(а):
> 
> 
> Hi there,
> 
> I'm learning Acero streaming execution engine recently. And I’m wondering if 
> Acero support distributed computing.
>  
> I have read code about aggregation node and kernel; Aggregation kernel seems 
> to hide the details of aggregation middle state. If use multiple nodes with 
> Acero execution engine, how to split aggregation tasks?
>  
> If current execution engine does not support distributed computing, taking 
> aggregation as an example, how would you plan to transform the aggregate 
> kernel to support distributed computation?
>  
> Any help or tips would be appreciated.
> 
> Thanks,
> Jiangtao

Reply via email to