Yes, what you’ve said is correct for Mean. But my point earlier is that there 
should only be a few of such special cases. A simple case would be e.g. Max, 
where Aggregate outputs Max and then merge outputs Max(Max). 

Sasha 

> 6 июля 2023 г., в 23:13, Jiangtao Peng <[email protected]> написал(а):
> 
> 
> Sorry for my unclear expression.
>  
> Take mean aggregation as example, does Aggregate "output" sum and count 
> value, and Accumulate will "input" sum and count value, then "merge" 
> sum(sum)/sum(count) as "output"?
> My point is how to implement Pre-Aggregation and Post-Aggregation using Acero.
>  
> Best,
> Jiangtao
>  
>  
> From: Sasha Krassovsky <[email protected]>
> Date: Friday, July 7, 2023 at 1:25 PM
> To: [email protected] <[email protected]>
> Subject: Re: [C++][Acero] can Acero support distributed computation?
> 
> Can you clarify what you mean by “data flow”? Each machine will be executing 
> the same query plan. The query plan will contain an operator called Shuffle, 
> and above the Shuffle will be an Aggregate, and above that will be an 
> Accumulate node. The SourceNode will read data from disk on each machine. The 
> data will then be put through the ShuffleNode, partitioned, and sent to other 
> machines. The other machines will perform their local aggregations until they 
> all agree that input is finished. Next the data on each machine will be input 
> into the Accumulate node, which ships it to the master. The master will then 
> perform the merge on all of the data it receives from all the participating 
> machines. 
>  
> Sasha 
> 
> 
> 6 июля 2023 г., в 22:17, Jiangtao Peng <[email protected]> написал(а):
> 
> 
> Sure, distributed aggregations can be split into “compute” and “merge” two 
> phase. But how about data flow of “compute” and “merge” on different nodes?
>  
>  
> Best,
> Jiangtao
>  
> From: Sasha Krassovsky <[email protected]>
> Date: Friday, July 7, 2023 at 11:07 AM
> To: [email protected] <[email protected]>
> Subject: Re: [C++][Acero] can Acero support distributed computation?
> 
> Distributed aggregations have two phases: “compute” (each node does its own 
> aggregation) and “merge” (the master merged the partial results). For most 
> aggregates (e.g. Sum, Min, Max), merge is just the same as compute, except 
> over the partial results. However, for Mean in particular, the compute phase 
> will actually be sum and count separately, and the merge phase will be sum of 
> partial sums divided by sum of partial counts. Count(distinct) will be merged 
> by Sum as well. There may be other special cases I’m not thinking of, that’s 
> the main idea. 
>  
> Sasha 
>  
> 6 июля 2023 г., в 19:29, Jiangtao Peng <[email protected]> написал(а):
> 
> 
> Hi Sasha,
>  
> Thanks for your reply!
>  
> Maybe Shuffle node is enough for data distribution. How about aggregation 
> node? For example, mean kernel with group will maintain sum and count value 
> for each group. Mater node merging mean result needs sum and count value on 
> each partition. But mean kernel seems not support method to export such sum 
> and count value, also, mean kernel doesn't support method to load these sum 
> and count value for additional merge. If it is feasible to provide 
> export/load method on aggregation kernel? Any other tips would be appreciated.
>  
> Thanks,
> Jiangtao
>  
> From: Sasha Krassovsky <[email protected]>
> Date: Friday, July 7, 2023 at 10:12 AM
> To: [email protected] <[email protected]>
> Subject: Re: [C++][Acero] can Acero support distributed computation?
> 
> Hi Jiangtao,
> Acero doesn’t support any distributed computation on its own. However, to get 
> some simple distributed computation going it would be sufficient to add a 
> Shuffle node. For example for Aggregation, the Shuffle would assign a range 
> of hashes to each node, and then each node would hash-partition its batches 
> locally and send each partition to be aggregated on the corresponding nodes. 
> You’d also need a master node to merge the results afterwards. 
>  
> In general the Shuffle-by-hash scheme works well for relational queries where 
> order doesn’t matter, but the time series functionality (i.e. as-of-join) 
> wouldn’t work as well. 
>  
> Hope this helps! 
> Sasha Krassovsky 
> 
> 
> 
> 
> 6 июля 2023 г., в 19:04, Jiangtao Peng <[email protected]> написал(а):
> 
> 
> Hi there,
> 
> I'm learning Acero streaming execution engine recently. And I’m wondering if 
> Acero support distributed computing.
>  
> I have read code about aggregation node and kernel; Aggregation kernel seems 
> to hide the details of aggregation middle state. If use multiple nodes with 
> Acero execution engine, how to split aggregation tasks?
>  
> If current execution engine does not support distributed computing, taking 
> aggregation as an example, how would you plan to transform the aggregate 
> kernel to support distributed computation?
>  
> Any help or tips would be appreciated.
> 
> Thanks,
> Jiangtao

Reply via email to