Hi,

I'm working on a batch job (roughly 10 billion records of input, 10
million groups) that is essentially a 'fold' over each group, that is, I
have a function

AggregateT addToAggrate(AggregateT agg, RecordT record) {...}

and want to fold this over each group in my DataSet.

My understanding is that I cannot use .groupBy(0).reduce(...) since the
ReduceFunction only supports the case where AggregateT is the same as
RecordT.

A simple solution using .reduceGroup(...) works, but spills all input
data in the reduce step, which produces a lot of slow & expensive Disk IO.

Therefore, we tried using .combineGroup(...).reduceGroup(...), but
experienced a similar amount of spilling. Checking the source of the
*Combine drivers, it seems that they accumulate events in a buffer, sort
the buffer by key, and combine adjacent records in the same group. This
does not work in my case due to the large number of groups - the records
in the buffer are most likely to all belong to different groups. The
"combine" phase therefore becomes a noop turning a single RecordT into
an AggregateT, and the reduce phase has 10 billion AggregateTs to combine.

Is there a way of modelling this computation efficiently with the
DataSet API? Alternatively, can I turn this into a DataStream job? (The
implementation there would simply be a MapFunction on a KeyedStream with
the AggregateT residing in keyed state, although I don't know how I
would emit this state at the end of the data stream only.)

Thanks,
Urs

-- 
Urs Schönenberger
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

Reply via email to