Yes, what you’ve said is correct for Mean. But my point earlier is that there should only be a few of such special cases. A simple case would be e.g. Max, where Aggregate outputs Max and then merge outputs Max(Max).
Sasha > 6 июля 2023 г., в 23:13, Jiangtao Peng <[email protected]> написал(а): > > > Sorry for my unclear expression. > > Take mean aggregation as example, does Aggregate "output" sum and count > value, and Accumulate will "input" sum and count value, then "merge" > sum(sum)/sum(count) as "output"? > My point is how to implement Pre-Aggregation and Post-Aggregation using Acero. > > Best, > Jiangtao > > > From: Sasha Krassovsky <[email protected]> > Date: Friday, July 7, 2023 at 1:25 PM > To: [email protected] <[email protected]> > Subject: Re: [C++][Acero] can Acero support distributed computation? > > Can you clarify what you mean by “data flow”? Each machine will be executing > the same query plan. The query plan will contain an operator called Shuffle, > and above the Shuffle will be an Aggregate, and above that will be an > Accumulate node. The SourceNode will read data from disk on each machine. The > data will then be put through the ShuffleNode, partitioned, and sent to other > machines. The other machines will perform their local aggregations until they > all agree that input is finished. Next the data on each machine will be input > into the Accumulate node, which ships it to the master. The master will then > perform the merge on all of the data it receives from all the participating > machines. > > Sasha > > > 6 июля 2023 г., в 22:17, Jiangtao Peng <[email protected]> написал(а): > > > Sure, distributed aggregations can be split into “compute” and “merge” two > phase. But how about data flow of “compute” and “merge” on different nodes? > > > Best, > Jiangtao > > From: Sasha Krassovsky <[email protected]> > Date: Friday, July 7, 2023 at 11:07 AM > To: [email protected] <[email protected]> > Subject: Re: [C++][Acero] can Acero support distributed computation? > > Distributed aggregations have two phases: “compute” (each node does its own > aggregation) and “merge” (the master merged the partial results). For most > aggregates (e.g. Sum, Min, Max), merge is just the same as compute, except > over the partial results. However, for Mean in particular, the compute phase > will actually be sum and count separately, and the merge phase will be sum of > partial sums divided by sum of partial counts. Count(distinct) will be merged > by Sum as well. There may be other special cases I’m not thinking of, that’s > the main idea. > > Sasha > > 6 июля 2023 г., в 19:29, Jiangtao Peng <[email protected]> написал(а): > > > Hi Sasha, > > Thanks for your reply! > > Maybe Shuffle node is enough for data distribution. How about aggregation > node? For example, mean kernel with group will maintain sum and count value > for each group. Mater node merging mean result needs sum and count value on > each partition. But mean kernel seems not support method to export such sum > and count value, also, mean kernel doesn't support method to load these sum > and count value for additional merge. If it is feasible to provide > export/load method on aggregation kernel? Any other tips would be appreciated. > > Thanks, > Jiangtao > > From: Sasha Krassovsky <[email protected]> > Date: Friday, July 7, 2023 at 10:12 AM > To: [email protected] <[email protected]> > Subject: Re: [C++][Acero] can Acero support distributed computation? > > Hi Jiangtao, > Acero doesn’t support any distributed computation on its own. However, to get > some simple distributed computation going it would be sufficient to add a > Shuffle node. For example for Aggregation, the Shuffle would assign a range > of hashes to each node, and then each node would hash-partition its batches > locally and send each partition to be aggregated on the corresponding nodes. > You’d also need a master node to merge the results afterwards. > > In general the Shuffle-by-hash scheme works well for relational queries where > order doesn’t matter, but the time series functionality (i.e. as-of-join) > wouldn’t work as well. > > Hope this helps! > Sasha Krassovsky > > > > > 6 июля 2023 г., в 19:04, Jiangtao Peng <[email protected]> написал(а): > > > Hi there, > > I'm learning Acero streaming execution engine recently. And I’m wondering if > Acero support distributed computing. > > I have read code about aggregation node and kernel; Aggregation kernel seems > to hide the details of aggregation middle state. If use multiple nodes with > Acero execution engine, how to split aggregation tasks? > > If current execution engine does not support distributed computing, taking > aggregation as an example, how would you plan to transform the aggregate > kernel to support distributed computation? > > Any help or tips would be appreciated. > > Thanks, > Jiangtao
